[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/loolnb.cpp net/socket.hpp

Michael Meeks michael.meeks at collabora.com
Tue Feb 14 11:29:51 UTC 2017


 net/loolnb.cpp |   38 ++++++++++++++++++--------------------
 net/socket.hpp |   32 ++++++++++++++++----------------
 2 files changed, 34 insertions(+), 36 deletions(-)

New commits:
commit 3f64e2b0b098e8bb88515875f79d547688752bbb
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Tue Feb 14 11:10:52 2017 +0000

    Cleanup comments, naming related warnings etc.
    
    Avoid using 'poll' as a member function, and a local variable.
    Avoid using 'fd' as a member function, and a parameter.
    Add assertions around wake pipe.
    Always setup sockets for polling, strobing events is expected.

diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index 94404b2..952c44a 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -16,6 +16,7 @@
 #include <iostream>
 #include <mutex>
 #include <thread>
+#include <assert.h>
 
 #include <Poco/Net/SocketAddress.h>
 
@@ -42,12 +43,11 @@ public:
         // Create the wakeup fd.
         if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1)
         {
-            //FIXME: Can't have wakeup pipe, should we exit?
+            // FIXME: Can't have wakeup pipe, should we exit?
+            // FIXME: running out of sockets should be a case we handle elegantly here - and also in our accept / ClientSocket creation I guess.
             _wakeup[0] = -1;
             _wakeup[1] = -1;
         }
-
-        createPollFds();
     }
 
     ~SocketPoll()
@@ -61,6 +61,9 @@ public:
     {
         const size_t size = _pollSockets.size();
 
+        // The events to poll on change each spin of the loop.
+        setupPollFds();
+
         int rc;
         do
         {
@@ -88,20 +91,13 @@ public:
             // Add new sockets first.
             addNewSocketsToPoll();
 
-            // Recreate the poll fds array.
-            createPollFds();
-
             // Clear the data.
             int dump;
-            if (::read(_wakeup[0], &dump, sizeof(4)) == -1)
+            if (::read(_wakeup[0], &dump, sizeof(dump)) == -1)
             {
                 // Nothing to do.
             }
         }
-        else if (_pollFds.size() != (_pollSockets.size() + 1))
-        {
-            createPollFds();
-        }
     }
 
     /// Insert a new socket to be polled.
@@ -115,7 +111,8 @@ public:
         // wakeup the main-loop.
         if (::write(_wakeup[1], "w", 1) == -1)
         {
-            // No wake up then.
+            // wakeup pipe is already full.
+            assert(errno == EAGAIN || errno == EWOULDBLOCK);
         }
     }
 
@@ -131,8 +128,8 @@ private:
         _newSockets.clear();
     }
 
-    /// Create the poll fds array.
-    void createPollFds()
+    /// Initialize the poll fds array with the right events
+    void setupPollFds()
     {
         const size_t size = _pollSockets.size();
 
@@ -140,8 +137,9 @@ private:
 
         for (size_t i = 0; i < size; ++i)
         {
-            _pollFds[i].fd = _pollSockets[i]->fd();
-            _pollFds[i].events = POLLIN | POLLOUT; //TODO: Get from the socket.
+            _pollFds[i].fd = _pollSockets[i]->getFD();
+            //TODO: Get from the socket.
+            _pollFds[i].events = POLLIN | POLLOUT;
             _pollFds[i].revents = 0;
         }
 
@@ -208,7 +206,7 @@ void client(const int timeoutMs)
         throw std::runtime_error(msg + std::strerror(errno) + ")");
     }
 
-    std::cout << "Connected " << client->fd() << std::endl;
+    std::cout << "Connected " << client->getFD() << std::endl;
 
     client->send("1", 1);
     int sent = 1;
@@ -259,7 +257,7 @@ void server(SocketPoll<ClientSocket>& poller)
                 throw std::runtime_error(msg + std::strerror(errno) + ")");
             }
 
-            std::cout << "Accepted client #" << clientSocket->fd() << std::endl;
+            std::cout << "Accepted client #" << clientSocket->getFD() << std::endl;
             poller.insertNewSocket(clientSocket);
         }
     }
@@ -288,7 +286,7 @@ void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
                     const int num = stoi(msg);
                     if ((num % (1<<16)) == 1)
                     {
-                        std::cout << "Client #" << socket->fd() << ": " << msg << std::endl;
+                        std::cout << "Client #" << socket->getFD() << ": " << msg << std::endl;
                     }
                     const std::string new_msg = std::to_string(num + 1);
                     const int sent = socket->send(new_msg.data(), new_msg.size());
@@ -301,7 +299,7 @@ void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
                 else
                 {
                     // Normally we'd buffer the response, but for now...
-                    std::cerr << "Client #" << socket->fd()
+                    std::cerr << "Client #" << socket->getFD()
                             << ": ERROR - socket not ready for write." << std::endl;
                 }
             }
diff --git a/net/socket.hpp b/net/socket.hpp
index b2891a3..83202b4 100644
--- a/net/socket.hpp
+++ b/net/socket.hpp
@@ -38,7 +38,7 @@ public:
     }
 
     // Returns the OS native socket fd.
-    int fd() const { return _fd; }
+    int getFD() const { return _fd; }
 
     /// Sets the send buffer in size bytes.
     /// Must be called before accept or connect.
@@ -114,18 +114,18 @@ public:
         // Use poll(2) as it has lower overhead for up to
         // a few hundred sockets compared to epoll(2).
         // Also it has a more intuitive API and portable.
-        pollfd poll;
-        memset(&poll, 0, sizeof(poll));
+        pollfd pollfd;
+        memset(&pollfd, 0, sizeof(pollfd));
 
-        poll.fd = fd();
-        poll.events |= events;
+        pollfd.fd = getFD();
+        pollfd.events |= events;
 
         int rc;
         do
         {
             // Technically, on retrying we should wait
             // the _remaining_ time, alas simplicity wins.
-            rc = ::poll(&poll, 1, timeoutMs);
+            rc = ::poll(&pollfd, 1, timeoutMs);
         }
         while (rc < 0 && errno == EINTR);
 
@@ -137,19 +137,19 @@ public:
         int revents = 0;
         if (rc == 1)
         {
-            if (poll.revents & (POLLERR|POLLHUP|POLLNVAL))
+            if (pollfd.revents & (POLLERR|POLLHUP|POLLNVAL))
             {
                 // Probe socket for error.
                 return -1;
             }
 
-            if (poll.revents & (POLLIN|POLLPRI))
+            if (pollfd.revents & (POLLIN|POLLPRI))
             {
                 // Data ready to read.
                 revents |= POLLIN;
             }
 
-            if (poll.revents & POLLOUT)
+            if (pollfd.revents & POLLOUT)
             {
                 // Ready for write.
                 revents |= POLLOUT;
@@ -206,7 +206,7 @@ public:
     /// only when the latter returns 0 we are connected.
     bool connect(const Poco::Net::SocketAddress& address, const int timeoutMs = 0)
     {
-        const int rc = ::connect(fd(), address.addr(), address.length());
+        const int rc = ::connect(getFD(), address.addr(), address.length());
         if (rc == 0)
         {
             return true;
@@ -229,7 +229,7 @@ public:
     int send(const void* buf, const size_t len)
     {
         // Don't SIGPIPE when the other end closes.
-        const int rc = ::send(fd(), buf, len, MSG_NOSIGNAL);
+        const int rc = ::send(getFD(), buf, len, MSG_NOSIGNAL);
         return rc;
     }
 
@@ -238,7 +238,7 @@ public:
     /// and 0 when the peer has performed an orderly shutdown.
     int recv(void* buf, const size_t len)
     {
-        const int rc = ::recv(fd(), buf, len, 0);
+        const int rc = ::recv(getFD(), buf, len, 0);
         return rc;
     }
 
@@ -266,9 +266,9 @@ public:
         //TODO: Might be worth refactoring out.
         const int reuseAddress = 1;
         constexpr unsigned int len = sizeof(reuseAddress);
-        ::setsockopt(fd(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
+        ::setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
 
-        const int rc = ::bind(fd(), address.addr(), address.length());
+        const int rc = ::bind(getFD(), address.addr(), address.length());
         return (rc == 0);
     }
 
@@ -277,7 +277,7 @@ public:
     /// Returns true on success only.
     bool listen(const int backlog = 64)
     {
-        const int rc = ::listen(fd(), backlog);
+        const int rc = ::listen(getFD(), backlog);
         return (rc == 0);
     }
 
@@ -288,7 +288,7 @@ public:
     {
         // Accept a connection (if any) and set it to non-blocking.
         // We don't care about the client's address, so ignored.
-        const int rc = ::accept4(fd(), nullptr, nullptr, SOCK_NONBLOCK);
+        const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK);
         return std::shared_ptr<ClientSocket>(rc != -1 ? new ClientSocket(rc) : nullptr);
     }
 };


More information about the Libreoffice-commits mailing list