[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/loolnb.cpp net/socket.hpp

Michael Meeks michael.meeks at collabora.com
Wed Feb 15 14:49:27 UTC 2017


 net/loolnb.cpp |  250 ++++++++++++++++------------------------------------
 net/socket.hpp |  273 +++++++++++++++++++++++++++++++--------------------------
 2 files changed, 231 insertions(+), 292 deletions(-)

New commits:
commit ae35938a9dac2fe632b32b072a9cb5469bdbbdb7
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Wed Feb 15 14:48:48 2017 +0000

    De-templatize and simplify.

diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index da7b17a..bffc684 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -52,147 +52,6 @@ public:
     }
 };
 
-/// Handles non-blocking socket event polling.
-/// Only polls on N-Sockets and invokes callback and
-/// doesn't manage buffers or client data.
-/// Note: uses poll(2) since it has very good performance
-/// compared to epoll up to a few hundred sockets and
-/// doesn't suffer select(2)'s poor API. Since this will
-/// be used per-document we don't expect to have several
-/// hundred users on same document to suffer poll(2)'s
-/// scalability limit. Meanwhile, epoll(2)'s high
-/// overhead to adding/removing sockets is not helpful.
-template <typename T>
-class SocketPoll
-{
-public:
-    SocketPoll()
-    {
-        // Create the wakeup fd.
-        if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1)
-        {
-            // FIXME: Can't have wakeup pipe, should we exit?
-            // FIXME: running out of sockets should be a case we handle elegantly here - and also in our accept / ClientSocket creation I guess.
-            _wakeup[0] = -1;
-            _wakeup[1] = -1;
-        }
-    }
-
-    ~SocketPoll()
-    {
-        ::close(_wakeup[0]);
-        ::close(_wakeup[1]);
-    }
-
-    /// Poll the sockets for available data to read or buffer to write.
-    void poll(const int timeoutMs, const std::function<bool(const std::shared_ptr<T>&, const int)>& handler)
-    {
-        const size_t size = _pollSockets.size();
-
-        // The events to poll on change each spin of the loop.
-        setupPollFds();
-
-        int rc;
-        do
-        {
-            rc = ::poll(&_pollFds[0], size + 1, timeoutMs);
-        }
-        while (rc < 0 && errno == EINTR);
-
-        // Fire the callback and remove dead fds.
-        for (int i = static_cast<int>(size) - 1; i >= 0; --i)
-        {
-            if (_pollFds[i].revents)
-            {
-                if (!handler(_pollSockets[i], _pollFds[i].revents))
-                {
-                    std::cout << "Removing: " << _pollFds[i].fd << std::endl;
-                    _pollSockets.erase(_pollSockets.begin() + i);
-                    // Don't remove from pollFds; we'll recreate below.
-                }
-            }
-        }
-
-        // Process the wakeup pipe (always the last entry).
-        if (_pollFds[size].revents)
-        {
-            // Add new sockets first.
-            addNewSocketsToPoll();
-
-            // Clear the data.
-            int dump;
-            if (::read(_wakeup[0], &dump, sizeof(dump)) == -1)
-            {
-                // Nothing to do.
-            }
-        }
-    }
-
-    /// Insert a new socket to be polled.
-    /// Sockets are removed only when the handler return false.
-    void insertNewSocket(const std::shared_ptr<T>& newSocket)
-    {
-        std::lock_guard<std::mutex> lock(_mutex);
-
-        _newSockets.emplace_back(newSocket);
-
-        // wakeup the main-loop.
-        if (::write(_wakeup[1], "w", 1) == -1)
-        {
-            // wakeup pipe is already full.
-            assert(errno == EAGAIN || errno == EWOULDBLOCK);
-        }
-    }
-
-private:
-
-    /// Add the new sockets to list of those to poll.
-    void addNewSocketsToPoll()
-    {
-        std::lock_guard<std::mutex> lock(_mutex);
-
-        // Copy the new sockets over and clear.
-        _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end());
-        _newSockets.clear();
-    }
-
-    void removeSocketFromPoll(const std::shared_ptr<T>& socket)
-    {
-        _pollSockets.erase(_pollSockets.find(socket));
-    }
-
-    /// Initialize the poll fds array with the right events
-    void setupPollFds()
-    {
-        const size_t size = _pollSockets.size();
-
-        _pollFds.resize(size + 1); // + wakeup pipe
-
-        for (size_t i = 0; i < size; ++i)
-        {
-            _pollFds[i].fd = _pollSockets[i]->getFD();
-            _pollFds[i].events = _pollSockets[i]->getPollEvents();
-            _pollFds[i].revents = 0;
-        }
-
-        // Add the read-end of the wake pipe.
-        _pollFds[size].fd = _wakeup[0];
-        _pollFds[size].events = POLLIN;
-        _pollFds[size].revents = 0;
-    }
-
-private:
-    /// main-loop wakeup pipe
-    int _wakeup[2];
-    /// The sockets we're controlling
-    std::vector<std::shared_ptr<T>> _pollSockets;
-    /// Protects _newSockets
-    std::mutex _mutex;
-    std::vector<std::shared_ptr<T>> _newSockets;
-    /// The fds to poll.
-    std::vector<pollfd> _pollFds;
-};
-
 /// Generic thread class.
 class Thread
 {
@@ -229,10 +88,78 @@ private:
 
 Poco::Net::SocketAddress addr("127.0.0.1", PortNumber);
 
-void server(SocketPoll<SimpleResponseClient>& poller)
+/// A non-blocking, streaming socket.
+class ServerSocket : public Socket
+{
+    SocketPoll& _clientPoller;
+public:
+    ServerSocket(SocketPoll& clientPoller)
+        : _clientPoller(clientPoller)
+    {
+    }
+
+    /// Binds to a local address (Servers only).
+    /// Does not retry on error.
+    /// Returns true on success only.
+    bool bind(const Poco::Net::SocketAddress& address)
+    {
+        // Enable address reuse to avoid stalling after
+        // recycling, when previous socket is TIME_WAIT.
+        //TODO: Might be worth refactoring out.
+        const int reuseAddress = 1;
+        constexpr unsigned int len = sizeof(reuseAddress);
+        ::setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
+
+        const int rc = ::bind(getFD(), address.addr(), address.length());
+        return (rc == 0);
+    }
+
+    /// Listen to incoming connections (Servers only).
+    /// Does not retry on error.
+    /// Returns true on success only.
+    bool listen(const int backlog = 64)
+    {
+        const int rc = ::listen(getFD(), backlog);
+        return (rc == 0);
+    }
+
+    /// Accepts an incoming connection (Servers only).
+    /// Does not retry on error.
+    /// Returns a valid Socket shared_ptr on success only.
+    template <typename T>
+       std::shared_ptr<T> accept()
+    {
+        // Accept a connection (if any) and set it to non-blocking.
+        // We don't care about the client's address, so ignored.
+        const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK);
+        return std::shared_ptr<T>(rc != -1 ? new T(rc) : nullptr);
+    }
+
+    int getPollEvents() override
+    {
+        return POLLIN;
+    }
+
+    HandleResult handlePoll( int /* events */ ) override
+    {
+        std::shared_ptr<SimpleResponseClient> clientSocket = accept<SimpleResponseClient>();
+        if (!clientSocket)
+        {
+            const std::string msg = "Failed to accept. (errno: ";
+            throw std::runtime_error(msg + std::strerror(errno) + ")");
+        }
+
+        std::cout << "Accepted client #" << clientSocket->getFD() << std::endl;
+        _clientPoller.insertNewSocket(clientSocket);
+
+        return Socket::HandleResult::CONTINUE;
+    }
+};
+
+void server(SocketPoll& clientPoller)
 {
     // Start server.
-    auto server = std::make_shared<ServerSocket>();
+    auto server = std::make_shared<ServerSocket>(clientPoller);
     if (!server->bind(addr))
     {
         const std::string msg = "Failed to bind. (errno: ";
@@ -245,51 +172,30 @@ void server(SocketPoll<SimpleResponseClient>& poller)
         throw std::runtime_error(msg + std::strerror(errno) + ")");
     }
 
+    SocketPoll serverPoll;
+
+    serverPoll.insertNewSocket(server);
+
     std::cout << "Listening." << std::endl;
     for (;;)
     {
-        if (server->pollRead(30000))
-        {
-            std::shared_ptr<SimpleResponseClient> clientSocket = server->accept<SimpleResponseClient>();
-            if (!clientSocket)
-            {
-                const std::string msg = "Failed to accept. (errno: ";
-                throw std::runtime_error(msg + std::strerror(errno) + ")");
-            }
-
-            std::cout << "Accepted client #" << clientSocket->getFD() << std::endl;
-            poller.insertNewSocket(clientSocket);
-        }
+        serverPoll.poll(30000);
     }
 }
 
 /// Poll client sockets and do IO.
-void pollAndComm(SocketPoll<SimpleResponseClient>& poller, std::atomic<bool>& stop)
+void pollAndComm(SocketPoll& poller, std::atomic<bool>& stop)
 {
     while (!stop)
     {
-        poller.poll(5000, [](const std::shared_ptr<SimpleResponseClient>& socket, const int events)
-        {
-            bool closeSocket = false;
-
-            if (events & POLLIN)
-                closeSocket = !socket->readIncomingData();
-
-            if (events & POLLOUT)
-                socket->writeOutgoingData();
-
-            if (events & (POLLHUP | POLLERR | POLLNVAL))
-                closeSocket = true;
-
-            return !closeSocket;
-        });
+        poller.poll(5000);
     }
 }
 
 int main(int, const char**)
 {
     // Used to poll client sockets.
-    SocketPoll<SimpleResponseClient> poller;
+    SocketPoll poller;
 
     // Start the client polling thread.
     Thread threadPoll([&poller](std::atomic<bool>& stop)
diff --git a/net/socket.hpp b/net/socket.hpp
index 17a3d9d..449f6e0 100644
--- a/net/socket.hpp
+++ b/net/socket.hpp
@@ -29,7 +29,7 @@ public:
     {
     }
 
-    ~Socket()
+    virtual ~Socket()
     {
         //TODO: Should we shutdown here or up to the client?
 
@@ -40,6 +40,14 @@ public:
     // Returns the OS native socket fd.
     int getFD() const { return _fd; }
 
+    /// Return a mask of events we should be polling for
+    virtual int getPollEvents() = 0;
+
+    /// Handle results of events returned from poll
+    enum HandleResult { CONTINUE, SOCKET_CLOSED };
+    virtual HandleResult handlePoll( int events ) = 0;
+
+
     /// Sets the send buffer in size bytes.
     /// Must be called before accept or connect.
     /// Note: TCP will allocate twice this size for admin purposes,
@@ -106,88 +114,164 @@ public:
         return rc;
     }
 
-    /// Poll the socket for either read, write, or both.
-    /// Returns -1 on failure/error (query socket error), 0 for timeout,
-    /// otherwise, depending on events, the respective bits set.
-    int poll(const int timeoutMs, const int events = POLLIN | POLLOUT)
+protected:
+
+    /// Construct based on an existing socket fd.
+    /// Used by accept() only.
+    Socket(const int fd) :
+        _fd(fd)
     {
-        // Use poll(2) as it has lower overhead for up to
-        // a few hundred sockets compared to epoll(2).
-        // Also it has a more intuitive API and portable.
-        pollfd pollfd;
-        memset(&pollfd, 0, sizeof(pollfd));
+    }
 
-        pollfd.fd = getFD();
-        pollfd.events |= events;
+private:
+    const int _fd;
+};
 
-        int rc;
-        do
+
+/// Handles non-blocking socket event polling.
+/// Only polls on N-Sockets and invokes callback and
+/// doesn't manage buffers or client data.
+/// Note: uses poll(2) since it has very good performance
+/// compared to epoll up to a few hundred sockets and
+/// doesn't suffer select(2)'s poor API. Since this will
+/// be used per-document we don't expect to have several
+/// hundred users on same document to suffer poll(2)'s
+/// scalability limit. Meanwhile, epoll(2)'s high
+/// overhead to adding/removing sockets is not helpful.
+class SocketPoll
+{
+public:
+    SocketPoll()
+    {
+        // Create the wakeup fd.
+        if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1)
         {
-            // Technically, on retrying we should wait
-            // the _remaining_ time, alas simplicity wins.
-            rc = ::poll(&pollfd, 1, timeoutMs);
+            // FIXME: Can't have wakeup pipe, should we exit?
+            // FIXME: running out of sockets should be a case we handle elegantly here - and also in our accept / ClientSocket creation I guess.
+            _wakeup[0] = -1;
+            _wakeup[1] = -1;
         }
-        while (rc < 0 && errno == EINTR);
+    }
+
+    ~SocketPoll()
+    {
+        ::close(_wakeup[0]);
+        ::close(_wakeup[1]);
+    }
+
+    /// Poll the sockets for available data to read or buffer to write.
+    void poll(const int timeoutMs)
+    {
+        const size_t size = _pollSockets.size();
 
-        if (rc <= 0)
+        // The events to poll on change each spin of the loop.
+        setupPollFds();
+
+        int rc;
+        do
         {
-            return rc;
+            rc = ::poll(&_pollFds[0], size + 1, timeoutMs);
         }
+        while (rc < 0 && errno == EINTR);
 
-        int revents = 0;
-        if (rc == 1)
+        // Fire the callback and remove dead fds.
+        for (int i = static_cast<int>(size) - 1; i >= 0; --i)
         {
-            if (pollfd.revents & (POLLERR|POLLHUP|POLLNVAL))
+            if (_pollFds[i].revents)
             {
-                // Probe socket for error.
-                return -1;
+                if (_pollSockets[i]->handlePoll(_pollFds[i].revents) ==
+                    Socket::HandleResult::SOCKET_CLOSED)
+                {
+                    std::cout << "Removing: " << _pollFds[i].fd << std::endl;
+                    _pollSockets.erase(_pollSockets.begin() + i);
+                    // Don't remove from pollFds; we'll recreate below.
+                }
             }
+        }
 
-            if (pollfd.revents & (POLLIN|POLLPRI))
-            {
-                // Data ready to read.
-                revents |= POLLIN;
-            }
+        // Process the wakeup pipe (always the last entry).
+        if (_pollFds[size].revents)
+        {
+            // Add new sockets first.
+            addNewSocketsToPoll();
 
-            if (pollfd.revents & POLLOUT)
+            // Clear the data.
+            int dump;
+            if (::read(_wakeup[0], &dump, sizeof(dump)) == -1)
             {
-                // Ready for write.
-                revents |= POLLOUT;
+                // Nothing to do.
             }
         }
-
-        return revents;
     }
 
-    /// Poll the socket for readability.
-    /// Returns true when there is data to read, otherwise false.
-    bool pollRead(const int timeoutMs)
+    /// Insert a new socket to be polled.
+    /// Sockets are removed only when the handler return false.
+    void insertNewSocket(const std::shared_ptr<Socket>& newSocket)
     {
-        const int rc = poll(timeoutMs, POLLIN);
-        return (rc > 0 && (rc & POLLIN));
+        std::lock_guard<std::mutex> lock(_mutex);
+
+        _newSockets.emplace_back(newSocket);
+
+        // wakeup the main-loop.
+        if (::write(_wakeup[1], "w", 1) == -1)
+        {
+            // wakeup pipe is already full.
+            assert(errno == EAGAIN || errno == EWOULDBLOCK);
+        }
     }
 
-    /// Poll the socket for writability.
-    /// Returns true when socket is ready for writing, otherwise false.
-    bool pollWrite(const int timeoutMs)
+private:
+
+    /// Add the new sockets to list of those to poll.
+    void addNewSocketsToPoll()
     {
-        const int rc = poll(timeoutMs, POLLOUT);
-        return (rc > 0 && (rc & POLLOUT));
+        std::lock_guard<std::mutex> lock(_mutex);
+
+        // Copy the new sockets over and clear.
+        _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end());
+        _newSockets.clear();
     }
 
-protected:
+    void removeSocketFromPoll(const std::shared_ptr<Socket>& socket)
+    {
+        auto it = std::find(_pollSockets.begin(), _pollSockets.end(), socket);
+        assert (it != _pollSockets.end());
+        _pollSockets.erase(it);
+    }
 
-    /// Construct based on an existing socket fd.
-    /// Used by accept() only.
-    Socket(const int fd) :
-        _fd(fd)
+    /// Initialize the poll fds array with the right events
+    void setupPollFds()
     {
+        const size_t size = _pollSockets.size();
+
+        _pollFds.resize(size + 1); // + wakeup pipe
+
+        for (size_t i = 0; i < size; ++i)
+        {
+            _pollFds[i].fd = _pollSockets[i]->getFD();
+            _pollFds[i].events = _pollSockets[i]->getPollEvents();
+            _pollFds[i].revents = 0;
+        }
+
+        // Add the read-end of the wake pipe.
+        _pollFds[size].fd = _wakeup[0];
+        _pollFds[size].events = POLLIN;
+        _pollFds[size].revents = 0;
     }
 
 private:
-    const int _fd;
+    /// main-loop wakeup pipe
+    int _wakeup[2];
+    /// The sockets we're controlling
+    std::vector<std::shared_ptr<Socket>> _pollSockets;
+    /// Protects _newSockets
+    std::mutex _mutex;
+    std::vector<std::shared_ptr<Socket>> _newSockets;
+    /// The fds to poll.
+    std::vector<pollfd> _pollFds;
 };
 
+
 /// A non-blocking, client socket.
 class ClientSocket : public Socket
 {
@@ -197,36 +281,28 @@ public:
     {
     }
 
-    /// Connect to a server address.
-    /// Does not retry on error.
-    /// timeoutMs can be 0 to avoid waiting, or -1 to wait forever.
-    /// Returns true on success only.
-    /// Note: when succceeds, caller must check for
-    /// EINPROGRESS and poll for write, then getError(),
-    /// only when the latter returns 0 we are connected.
-    bool connect(const Poco::Net::SocketAddress& address, const int timeoutMs = 0)
+  protected:
+    std::vector< unsigned char > _inBuffer;
+    std::vector< unsigned char > _outBuffer;
+  public:
+
+    HandleResult handlePoll( int events ) override
     {
-        const int rc = ::connect(getFD(), address.addr(), address.length());
-        if (rc == 0)
-        {
-            return true;
-        }
+        bool closeSocket = false;
 
-        if (errno != EINPROGRESS)
-        {
-            return false;
-        }
+        if (events & POLLIN)
+            closeSocket = !readIncomingData();
 
-        // Wait for writable, then check again.
-        pollWrite(timeoutMs);
+        if (events & POLLOUT)
+            writeOutgoingData();
 
-        // Now check if we connected, not, or not yet.
-        return (getError() == 0 || errno == EINPROGRESS);
+        if (events & (POLLHUP | POLLERR | POLLNVAL))
+            closeSocket = true;
+
+        return closeSocket ? HandleResult::SOCKET_CLOSED :
+                             HandleResult::CONTINUE;
     }
-  protected:
-    std::vector< unsigned char > _inBuffer;
-    std::vector< unsigned char > _outBuffer;
-  public:
+
     bool readIncomingData()
     {
         ssize_t len;
@@ -260,7 +336,7 @@ public:
         // else poll will handle errors
     }
 
-    int getPollEvents()
+    int getPollEvents() override
     {
         int pollFor = POLLIN;
         if (_outBuffer.size() > 0)
@@ -279,47 +355,4 @@ protected:
     friend class ServerSocket;
 };
 
-/// A non-blocking, streaming socket.
-class ServerSocket : public Socket
-{
-public:
-
-    /// Binds to a local address (Servers only).
-    /// Does not retry on error.
-    /// Returns true on success only.
-    bool bind(const Poco::Net::SocketAddress& address)
-    {
-        // Enable address reuse to avoid stalling after
-        // recycling, when previous socket is TIME_WAIT.
-        //TODO: Might be worth refactoring out.
-        const int reuseAddress = 1;
-        constexpr unsigned int len = sizeof(reuseAddress);
-        ::setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
-
-        const int rc = ::bind(getFD(), address.addr(), address.length());
-        return (rc == 0);
-    }
-
-    /// Listen to incoming connections (Servers only).
-    /// Does not retry on error.
-    /// Returns true on success only.
-    bool listen(const int backlog = 64)
-    {
-        const int rc = ::listen(getFD(), backlog);
-        return (rc == 0);
-    }
-
-    /// Accepts an incoming connection (Servers only).
-    /// Does not retry on error.
-    /// Returns a valid Socket shared_ptr on success only.
-    template <typename T>
-       std::shared_ptr<T> accept()
-    {
-        // Accept a connection (if any) and set it to non-blocking.
-        // We don't care about the client's address, so ignored.
-        const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK);
-        return std::shared_ptr<T>(rc != -1 ? new T(rc) : nullptr);
-    }
-};
-
 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */


More information about the Libreoffice-commits mailing list