[systemd-commits] 2 commits - src/libsystemd-rtnl src/network src/systemd

Tom Gundersen tomegun at kemper.freedesktop.org
Wed Nov 13 10:58:24 PST 2013


 src/libsystemd-rtnl/rtnl-internal.h |   13 +
 src/libsystemd-rtnl/rtnl-message.c  |    2 
 src/libsystemd-rtnl/rtnl-util.c     |    4 
 src/libsystemd-rtnl/sd-rtnl.c       |  300 +++++++++++++++++++++++++++++-------
 src/libsystemd-rtnl/test-rtnl.c     |    8 
 src/network/networkd-address.c      |    2 
 src/network/networkd-link.c         |    2 
 src/network/networkd-route.c        |    2 
 src/systemd/sd-rtnl.h               |    7 
 9 files changed, 275 insertions(+), 65 deletions(-)

New commits:
commit 4555ec72d6530fce4c978fd894ac22f7e006b0ee
Author: Tom Gundersen <teg at jklm.no>
Date:   Tue Nov 12 22:37:51 2013 +0100

    rtnl: start adding support for asynchronous messaging
    
    Similarly to sd-bus, add:
    
    sd_rtnl_wait
    sd_rtnl_process
    sd_rtnl_send
    
    and adapt sd_rtnl_call accordingly.

diff --git a/src/libsystemd-rtnl/rtnl-internal.h b/src/libsystemd-rtnl/rtnl-internal.h
index b05290f..03297bb 100644
--- a/src/libsystemd-rtnl/rtnl-internal.h
+++ b/src/libsystemd-rtnl/rtnl-internal.h
@@ -35,6 +35,14 @@ struct sd_rtnl {
                 struct sockaddr_nl nl;
         } sockaddr;
 
+        sd_rtnl_message **rqueue;
+        unsigned rqueue_size;
+
+        sd_rtnl_message **wqueue;
+        unsigned wqueue_size;
+
+        bool processing:1;
+
         uint32_t serial;
 
         pid_t original_pid;
@@ -42,8 +50,11 @@ struct sd_rtnl {
 
 #define RTNL_DEFAULT_TIMEOUT ((usec_t) (10 * USEC_PER_SEC))
 
+#define RTNL_WQUEUE_MAX 1024
+#define RTNL_RQUEUE_MAX 64*1024
+
 int message_get_errno(sd_rtnl_message *m);
-int message_get_serial(sd_rtnl_message *m);
+uint32_t message_get_serial(sd_rtnl_message *m);
 int message_seal(sd_rtnl *nl, sd_rtnl_message *m);
 int socket_write_message(sd_rtnl *nl, sd_rtnl_message *m);
 int socket_read_message(sd_rtnl *nl, sd_rtnl_message **ret);
diff --git a/src/libsystemd-rtnl/rtnl-message.c b/src/libsystemd-rtnl/rtnl-message.c
index 941bd96..f7ff0a0 100644
--- a/src/libsystemd-rtnl/rtnl-message.c
+++ b/src/libsystemd-rtnl/rtnl-message.c
@@ -367,7 +367,7 @@ int sd_rtnl_message_read(sd_rtnl_message *m, unsigned short *type, void **data)
         return message_read(m, type, data);
 }
 
-int message_get_serial(sd_rtnl_message *m) {
+uint32_t message_get_serial(sd_rtnl_message *m) {
         assert(m);
 
         return m->hdr->nlmsg_seq;
diff --git a/src/libsystemd-rtnl/sd-rtnl.c b/src/libsystemd-rtnl/sd-rtnl.c
index eb3b01b..b375576 100644
--- a/src/libsystemd-rtnl/sd-rtnl.c
+++ b/src/libsystemd-rtnl/sd-rtnl.c
@@ -46,6 +46,14 @@ static int sd_rtnl_new(sd_rtnl **ret) {
 
         rtnl->original_pid = getpid();
 
+        /* We guarantee that wqueue always has space for at least
+         * one entry */
+        rtnl->wqueue = new(sd_rtnl_message*, 1);
+        if (!rtnl->wqueue) {
+                free(rtnl);
+                return -ENOMEM;
+        }
+
         *ret = rtnl;
         return 0;
 }
@@ -98,23 +106,31 @@ sd_rtnl *sd_rtnl_ref(sd_rtnl *rtnl) {
 }
 
 sd_rtnl *sd_rtnl_unref(sd_rtnl *rtnl) {
+
         if (rtnl && REFCNT_DEC(rtnl->n_ref) <= 0) {
+                unsigned i;
+
+                for (i = 0; i < rtnl->rqueue_size; i++)
+                        sd_rtnl_message_unref(rtnl->rqueue[i]);
+                free(rtnl->rqueue);
+
+                for (i = 0; i < rtnl->wqueue_size; i++)
+                        sd_rtnl_message_unref(rtnl->wqueue[i]);
+                free(rtnl->wqueue);
+
                 if (rtnl->fd >= 0)
                         close_nointr_nofail(rtnl->fd);
+
                 free(rtnl);
         }
 
         return NULL;
 }
 
-int sd_rtnl_call(sd_rtnl *nl,
-                sd_rtnl_message *message,
-                uint64_t usec,
-                sd_rtnl_message **ret) {
-        struct pollfd p[1] = {};
-        struct timespec left;
-        usec_t timeout;
-        int r, serial;
+int sd_rtnl_send(sd_rtnl *nl,
+                 sd_rtnl_message *message,
+                 uint32_t *serial) {
+        int r;
 
         assert_return(nl, -EINVAL);
         assert_return(!rtnl_pid_changed(nl), -ECHILD);
@@ -124,82 +140,260 @@ int sd_rtnl_call(sd_rtnl *nl,
         if (r < 0)
                 return r;
 
-        serial = message_get_serial(message);
+        if (nl->wqueue_size <= 0) {
+                /* send directly */
+                r = socket_write_message(nl, message);
+                if (r < 0)
+                        return r;
+                else if (r == 0) {
+                        /* nothing was sent, so let's put it on
+                         * the queue */
+                        nl->wqueue[0] = sd_rtnl_message_ref(message);
+                        nl->wqueue_size = 1;
+                }
+        } else {
+                sd_rtnl_message **q;
 
-        p[0].fd = nl->fd;
-        p[0].events = POLLOUT;
+                /* append to queue */
+                if (nl->wqueue_size >= RTNL_WQUEUE_MAX)
+                        return -ENOBUFS;
 
-        if (usec == (uint64_t) -1)
-                timeout = 0;
-        else if (usec == 0)
-                timeout = now(CLOCK_MONOTONIC) + RTNL_DEFAULT_TIMEOUT;
-        else
-                timeout = now(CLOCK_MONOTONIC) + usec;
+                q = realloc(nl->wqueue, sizeof(sd_rtnl_message*) * (nl->wqueue_size + 1));
+                if (!q)
+                        return -ENOMEM;
 
-        for (;;) {
-                if (timeout) {
-                        usec_t n;
+                nl->wqueue = q;
+                q[nl->wqueue_size ++] = sd_rtnl_message_ref(message);
+        }
 
-                        n = now(CLOCK_MONOTONIC);
-                        if (n >= timeout)
-                                return -ETIMEDOUT;
+        if (serial)
+                *serial = message_get_serial(message);
 
-                        timespec_store(&left, timeout - n);
-                }
+        return 1;
+}
 
-                r = ppoll(p, 1, timeout ? &left : NULL, NULL);
-                if (r < 0)
-                        return 0;
+static int dispatch_rqueue(sd_rtnl *rtnl, sd_rtnl_message **message) {
+        sd_rtnl_message *z = NULL;
+        int r;
 
-                r = socket_write_message(nl, message);
+        assert(rtnl);
+        assert(message);
+
+        if (rtnl->rqueue_size > 0) {
+                /* Dispatch a queued message */
+
+                *message = rtnl->rqueue[0];
+                rtnl->rqueue_size --;
+                memmove(rtnl->rqueue, rtnl->rqueue + 1, sizeof(sd_rtnl_message*) * rtnl->rqueue_size);
+
+                return 1;
+        }
+
+        /* Try to read a new message */
+        r = socket_read_message(rtnl, &z);
+        if (r < 0)
+                return r;
+        if (r == 0)
+                return 0;
+
+        *message = z;
+
+        return 1;
+}
+
+static int dispatch_wqueue(sd_rtnl *rtnl) {
+        int r, ret = 0;
+
+        assert(rtnl);
+
+        while (rtnl->wqueue_size > 0) {
+                r = socket_write_message(rtnl, rtnl->wqueue[0]);
                 if (r < 0)
                         return r;
-
-                if (r > 0) {
-                        break;
+                else if (r == 0)
+                        /* Didn't do anything this time */
+                        return ret;
+                else {
+                        /* see equivalent in sd-bus.c */
+                        sd_rtnl_message_unref(rtnl->wqueue[0]);
+                        rtnl->wqueue_size --;
+                        memmove(rtnl->wqueue, rtnl->wqueue + 1, sizeof(sd_rtnl_message*) * rtnl->wqueue_size);
+
+                        ret = 1;
                 }
         }
 
+        return ret;
+}
+
+static int process_running(sd_rtnl *rtnl, sd_rtnl_message **ret) {
+        _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *m = NULL;
+        int r;
+
+        r = dispatch_wqueue(rtnl);
+        if (r != 0)
+                goto null_message;
+
+        r = dispatch_rqueue(rtnl, &m);
+        if (r < 0)
+                return r;
+        if (!m)
+                goto null_message;
+
+        if (ret) {
+                *ret = m;
+                m = NULL;
+
+                return 1;
+        }
+
+        return 1;
+
+null_message:
+        if (r >= 0 && ret)
+                *ret = NULL;
+
+        return r;
+}
+int sd_rtnl_process(sd_rtnl *rtnl, sd_rtnl_message **ret) {
+        int r;
+
+        assert_return(rtnl, -EINVAL);
+        assert_return(!rtnl_pid_changed(rtnl), -ECHILD);
+        assert_return(!rtnl->processing, -EBUSY);
+
+        rtnl->processing = true;
+        r = process_running(rtnl, ret);
+        rtnl->processing = false;
+
+        return r;
+}
+
+static usec_t calc_elapse(uint64_t usec) {
+        if (usec == (uint64_t) -1)
+                return 0;
+
+        if (usec == 0)
+                usec = RTNL_DEFAULT_TIMEOUT;
+
+        return now(CLOCK_MONOTONIC) + usec;
+}
+
+static int rtnl_poll(sd_rtnl *nl, uint64_t timeout_usec) {
+        struct pollfd p[1] = {};
+        struct timespec ts;
+        int r;
+
+        assert(nl);
+
+        p[0].fd = nl->fd;
         p[0].events = POLLIN;
 
+        r = ppoll(p, 1, timeout_usec == (uint64_t) -1 ? NULL :
+                        timespec_store(&ts, timeout_usec), NULL);
+        if (r < 0)
+                return -errno;
+
+        return r > 0 ? 1 : 0;
+}
+
+int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout_usec) {
+        assert_return(nl, -EINVAL);
+        assert_return(!rtnl_pid_changed(nl), -ECHILD);
+
+        if (nl->rqueue_size > 0)
+                return 0;
+
+        return rtnl_poll(nl, timeout_usec);
+}
+
+int sd_rtnl_call(sd_rtnl *nl,
+                sd_rtnl_message *message,
+                uint64_t usec,
+                sd_rtnl_message **ret) {
+        usec_t timeout;
+        uint32_t serial;
+        bool room = false;
+        int r;
+
+        assert_return(nl, -EINVAL);
+        assert_return(!rtnl_pid_changed(nl), -ECHILD);
+        assert_return(message, -EINVAL);
+
+        r = sd_rtnl_send(nl, message, &serial);
+        if (r < 0)
+                return r;
+
+        timeout = calc_elapse(usec);
+
         for (;;) {
-                _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *reply = NULL;
+                usec_t left;
+                _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *incoming = NULL;
 
-                if (timeout) {
-                        usec_t n;
+                if (!room) {
+                        sd_rtnl_message **q;
 
-                        n = now(CLOCK_MONOTONIC);
-                        if (n >= timeout)
-                                return -ETIMEDOUT;
+                        if (nl->rqueue_size >= RTNL_RQUEUE_MAX)
+                                return -ENOBUFS;
 
-                        timespec_store(&left, timeout - n);
-                }
+                        /* Make sure there's room for queueing this
+                         * locally, before we read the message */
 
-                r = ppoll(p, 1, timeout ? &left : NULL, NULL);
-                if (r < 0)
-                        return r;
+                        q = realloc(nl->rqueue, (nl->rqueue_size + 1) * sizeof(sd_rtnl_message*));
+                        if (!q)
+                                return -ENOMEM;
 
-                r = socket_read_message(nl, &reply);
+                        nl->rqueue = q;
+                        room = true;
+                }
+
+                r = socket_read_message(nl, &incoming);
                 if (r < 0)
                         return r;
-
-                if (r > 0) {
-                        int received_serial = message_get_serial(reply);
+                if (incoming) {
+                        uint32_t received_serial = message_get_serial(incoming);
 
                         if (received_serial == serial) {
-                                r = message_get_errno(reply);
+                                r = message_get_errno(incoming);
                                 if (r < 0)
                                         return r;
 
                                 if (ret) {
-                                        *ret = reply;
-                                        reply = NULL;
+                                        *ret = incoming;
+                                        incoming = NULL;
                                 }
 
-                                break;;
+                                return 1;
                         }
+
+                        /* Room was allocated on the queue above */
+                        nl->rqueue[nl->rqueue_size ++] = incoming;
+                        incoming = NULL;
+                        room = false;
+
+                        /* Try to read more, right away */
+                        continue;
                 }
-        }
+                if (r != 0)
+                        continue;
 
-        return 0;
+                if (timeout > 0) {
+                        usec_t n;
+
+                        n = now(CLOCK_MONOTONIC);
+                        if (n >= timeout)
+                                return -ETIMEDOUT;
+
+                        left = timeout - n;
+                } else
+                        left = (uint64_t) -1;
+
+                r = rtnl_poll(nl, left);
+                if (r < 0)
+                        return r;
+
+                r = dispatch_wqueue(nl);
+                if (r < 0)
+                        return r;
+        }
 }
diff --git a/src/libsystemd-rtnl/test-rtnl.c b/src/libsystemd-rtnl/test-rtnl.c
index 61345bc..3615086 100644
--- a/src/libsystemd-rtnl/test-rtnl.c
+++ b/src/libsystemd-rtnl/test-rtnl.c
@@ -53,7 +53,7 @@ static void test_link_configure(sd_rtnl *rtnl, int ifindex) {
         assert(type == IFLA_MTU);
         assert(mtu == *(unsigned int *) data);
 
-        assert(sd_rtnl_call(rtnl, message, 0, NULL) == 0);
+        assert(sd_rtnl_call(rtnl, message, 0, NULL) == 1);
 }
 
 static void test_route(void) {
@@ -133,7 +133,7 @@ int main(void) {
 
         assert(sd_rtnl_message_read(m, &type, &data) == 0);
 
-        assert(sd_rtnl_call(rtnl, m, 0, &r) >= 0);
+        assert(sd_rtnl_call(rtnl, m, 0, &r) == 1);
         assert(sd_rtnl_message_get_type(r, &type) >= 0);
         assert(type == RTM_NEWLINK);
 
@@ -155,7 +155,7 @@ int main(void) {
 
         assert(sd_rtnl_message_read(m, &type, &data) == 0);
 
-        assert(sd_rtnl_call(rtnl, m, -1, &r) >= 0);
+        assert(sd_rtnl_call(rtnl, m, -1, &r) == 1);
         while (sd_rtnl_message_read(r, &type, &data) > 0) {
                 switch (type) {
 //                        case IFLA_MTU:
diff --git a/src/systemd/sd-rtnl.h b/src/systemd/sd-rtnl.h
index 2d166c4..87acc31 100644
--- a/src/systemd/sd-rtnl.h
+++ b/src/systemd/sd-rtnl.h
@@ -37,9 +37,13 @@ int sd_rtnl_open(uint32_t groups, sd_rtnl **nl);
 sd_rtnl *sd_rtnl_ref(sd_rtnl *nl);
 sd_rtnl *sd_rtnl_unref(sd_rtnl *nl);
 
+int sd_rtnl_send(sd_rtnl *nl, sd_rtnl_message *message, uint32_t *serial);
 int sd_rtnl_call(sd_rtnl *nl, sd_rtnl_message *message, uint64_t timeout,
                  sd_rtnl_message **reply);
 
+int sd_rtnl_process(sd_rtnl *nl, sd_rtnl_message **ret);
+int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout);
+
 /* messages */
 int sd_rtnl_message_link_new(uint16_t msg_type, int index, unsigned int type,
                              unsigned int flags, sd_rtnl_message **ret);

commit fe4824e065765f4536c84916694bb050c4a5d0af
Author: Tom Gundersen <teg at jklm.no>
Date:   Mon Nov 11 18:55:34 2013 +0100

    rtnl: rename rtnl_bus_send_with_reply_and_block() to rtnl_bus_call()
    
    Follow the equivalent rename in sd-bus to stay as similar as possible.

diff --git a/src/libsystemd-rtnl/rtnl-util.c b/src/libsystemd-rtnl/rtnl-util.c
index 9707aa0..d40858a 100644
--- a/src/libsystemd-rtnl/rtnl-util.c
+++ b/src/libsystemd-rtnl/rtnl-util.c
@@ -42,7 +42,7 @@ int rtnl_set_link_name(sd_rtnl *rtnl, int ifindex, const char *name) {
         if (r < 0)
                 return r;
 
-        r = sd_rtnl_send_with_reply_and_block(rtnl, message, 0, NULL);
+        r = sd_rtnl_call(rtnl, message, 0, NULL);
         if (r < 0)
                 return r;
 
@@ -81,7 +81,7 @@ int rtnl_set_link_properties(sd_rtnl *rtnl, int ifindex, const struct ether_addr
         }
 
         if  (need_update) {
-                r = sd_rtnl_send_with_reply_and_block(rtnl, message, 0, NULL);
+                r = sd_rtnl_call(rtnl, message, 0, NULL);
                 if (r < 0)
                         return r;
         }
diff --git a/src/libsystemd-rtnl/sd-rtnl.c b/src/libsystemd-rtnl/sd-rtnl.c
index 8ea11df..eb3b01b 100644
--- a/src/libsystemd-rtnl/sd-rtnl.c
+++ b/src/libsystemd-rtnl/sd-rtnl.c
@@ -107,7 +107,7 @@ sd_rtnl *sd_rtnl_unref(sd_rtnl *rtnl) {
         return NULL;
 }
 
-int sd_rtnl_send_with_reply_and_block(sd_rtnl *nl,
+int sd_rtnl_call(sd_rtnl *nl,
                 sd_rtnl_message *message,
                 uint64_t usec,
                 sd_rtnl_message **ret) {
diff --git a/src/libsystemd-rtnl/test-rtnl.c b/src/libsystemd-rtnl/test-rtnl.c
index 912cc66..61345bc 100644
--- a/src/libsystemd-rtnl/test-rtnl.c
+++ b/src/libsystemd-rtnl/test-rtnl.c
@@ -53,7 +53,7 @@ static void test_link_configure(sd_rtnl *rtnl, int ifindex) {
         assert(type == IFLA_MTU);
         assert(mtu == *(unsigned int *) data);
 
-        assert(sd_rtnl_send_with_reply_and_block(rtnl, message, 0, NULL) == 0);
+        assert(sd_rtnl_call(rtnl, message, 0, NULL) == 0);
 }
 
 static void test_route(void) {
@@ -133,14 +133,14 @@ int main(void) {
 
         assert(sd_rtnl_message_read(m, &type, &data) == 0);
 
-        assert(sd_rtnl_send_with_reply_and_block(rtnl, m, 0, &r) >= 0);
+        assert(sd_rtnl_call(rtnl, m, 0, &r) >= 0);
         assert(sd_rtnl_message_get_type(r, &type) >= 0);
         assert(type == RTM_NEWLINK);
 
         assert(sd_rtnl_message_read(m, &type, &data) == 0);
         assert((r = sd_rtnl_message_unref(r)) == NULL);
 
-        assert(sd_rtnl_send_with_reply_and_block(rtnl, m, -1, &r) == -EPERM);
+        assert(sd_rtnl_call(rtnl, m, -1, &r) == -EPERM);
         assert((m = sd_rtnl_message_unref(m)) == NULL);
         assert((r = sd_rtnl_message_unref(r)) == NULL);
 
@@ -155,7 +155,7 @@ int main(void) {
 
         assert(sd_rtnl_message_read(m, &type, &data) == 0);
 
-        assert(sd_rtnl_send_with_reply_and_block(rtnl, m, -1, &r) >= 0);
+        assert(sd_rtnl_call(rtnl, m, -1, &r) >= 0);
         while (sd_rtnl_message_read(r, &type, &data) > 0) {
                 switch (type) {
 //                        case IFLA_MTU:
diff --git a/src/network/networkd-address.c b/src/network/networkd-address.c
index 9a7106e..e02b1df 100644
--- a/src/network/networkd-address.c
+++ b/src/network/networkd-address.c
@@ -98,7 +98,7 @@ int address_configure(Manager *manager, Address *address, Link *link) {
                 }
         }
 
-        r = sd_rtnl_send_with_reply_and_block(manager->rtnl, req, 0, NULL);
+        r = sd_rtnl_call(manager->rtnl, req, 0, NULL);
         if (r < 0) {
                 log_error("Could not configure address: %s", strerror(-r));
                 return r != -EEXIST ? r : 0;
diff --git a/src/network/networkd-link.c b/src/network/networkd-link.c
index 486d4de..085b8db 100644
--- a/src/network/networkd-link.c
+++ b/src/network/networkd-link.c
@@ -107,7 +107,7 @@ int link_up(Manager *manager, Link *link) {
                 return r;
         }
 
-        r = sd_rtnl_send_with_reply_and_block(manager->rtnl, req, 0, NULL);
+        r = sd_rtnl_call(manager->rtnl, req, 0, NULL);
         if (r < 0) {
                 log_error("Could not UP link: %s", strerror(-r));
                 return r;
diff --git a/src/network/networkd-route.c b/src/network/networkd-route.c
index 918a1d0..3158616 100644
--- a/src/network/networkd-route.c
+++ b/src/network/networkd-route.c
@@ -83,7 +83,7 @@ int route_configure(Manager *manager, Route *route, Link *link) {
                 return r;
         }
 
-        r = sd_rtnl_send_with_reply_and_block(manager->rtnl, req, 0, NULL);
+        r = sd_rtnl_call(manager->rtnl, req, 0, NULL);
         if (r < 0) {
                 log_error("Could not configure route: %s", strerror(-r));
                 return r;
diff --git a/src/systemd/sd-rtnl.h b/src/systemd/sd-rtnl.h
index 223f403..2d166c4 100644
--- a/src/systemd/sd-rtnl.h
+++ b/src/systemd/sd-rtnl.h
@@ -37,7 +37,8 @@ int sd_rtnl_open(uint32_t groups, sd_rtnl **nl);
 sd_rtnl *sd_rtnl_ref(sd_rtnl *nl);
 sd_rtnl *sd_rtnl_unref(sd_rtnl *nl);
 
-int sd_rtnl_send_with_reply_and_block(sd_rtnl *nl, sd_rtnl_message *message, uint64_t timeout, sd_rtnl_message **reply);
+int sd_rtnl_call(sd_rtnl *nl, sd_rtnl_message *message, uint64_t timeout,
+                 sd_rtnl_message **reply);
 
 /* messages */
 int sd_rtnl_message_link_new(uint16_t msg_type, int index, unsigned int type,



More information about the systemd-commits mailing list