[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/loolnb.cpp

Ashod Nakashian ashod.nakashian at collabora.co.uk
Tue Feb 14 03:29:32 UTC 2017


 net/loolnb.cpp |  210 +++++++++++++++++++++++++++++++--------------------------
 1 file changed, 117 insertions(+), 93 deletions(-)

New commits:
commit b9f4bed8ea75138ed04a66ffc6ccfd76396cc832
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date:   Mon Feb 13 21:49:59 2017 -0500

    nb: separate Socket into ClientSocket and ServerSocket
    
    Change-Id: I1aafd6192b955e51b8f1e74c1aad5fc3603f71d6
    Reviewed-on: https://gerrit.libreoffice.org/34237
    Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
    Tested-by: Ashod Nakashian <ashnakash at gmail.com>

diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index 07c5c82..0ca4faf 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -127,87 +127,6 @@ public:
         return rc;
     }
 
-    /// Connect to a server address.
-    /// Does not retry on error.
-    /// timeoutMs can be 0 to avoid waiting, or -1 to wait forever.
-    /// Returns true on success only.
-    /// Note: when succceeds, caller must check for
-    /// EINPROGRESS and poll for write, then getError(),
-    /// only when the latter returns 0 we are connected.
-    bool connect(const SocketAddress& address, const int timeoutMs = 0)
-    {
-        const int rc = ::connect(_fd, address.addr(), address.length());
-        if (rc == 0)
-        {
-            return true;
-        }
-
-        if (errno != EINPROGRESS)
-        {
-            return false;
-        }
-
-        // Wait for writable, then check again.
-        pollWrite(timeoutMs);
-
-        // Now check if we connected, not, or not yet.
-        return (getError() == 0 || errno == EINPROGRESS);
-    }
-
-    /// Binds to a local address (Servers only).
-    /// Does not retry on error.
-    /// Returns true on success only.
-    bool bind(const SocketAddress& address)
-    {
-        // Enable address reuse to avoid stalling after
-        // recycling, when previous socket is TIME_WAIT.
-        //TODO: Might be worth refactoring out.
-        const int reuseAddress = 1;
-        constexpr unsigned int len = sizeof(reuseAddress);
-        ::setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
-
-        const int rc = ::bind(_fd, address.addr(), address.length());
-        return (rc == 0);
-    }
-
-    /// Listen to incoming connections (Servers only).
-    /// Does not retry on error.
-    /// Returns true on success only.
-    bool listen(const int backlog = 64)
-    {
-        const int rc = ::listen(_fd, backlog);
-        return (rc == 0);
-    }
-
-    /// Accepts an incoming connection (Servers only).
-    /// Does not retry on error.
-    /// Returns a valid Socket shared_ptr on success only.
-    std::shared_ptr<Socket> accept()
-    {
-        // Accept a connection (if any) and set it to non-blocking.
-        // We don't care about the client's address, so ignored.
-        const int rc = ::accept4(_fd, nullptr, nullptr, SOCK_NONBLOCK);
-        return std::shared_ptr<Socket>(rc != -1 ? new Socket(rc) : nullptr);
-    }
-
-    /// Send data to our peer.
-    /// Returns the number of bytes sent, -1 on error.
-    int send(const void* buf, const size_t len)
-    {
-        // Don't SIGPIPE when the other end closes.
-        const int rc = ::send(_fd, buf, len, MSG_NOSIGNAL);
-        return rc;
-    }
-
-    /// Receive data from our peer.
-    /// Returns the number of bytes received, -1 on error,
-    /// and 0 when the peer has performed an orderly shutdown.
-    int recv(void* buf, const size_t len)
-    {
-        const int rc = ::recv(_fd, buf, len, 0);
-        return rc;
-    }
-
     /// Poll the socket for either read, write, or both.
     /// Returns -1 on failure/error (query socket error), 0 for timeout,
     /// otherwise, depending on events, the respective bits set.
@@ -219,7 +138,7 @@ public:
         pollfd poll;
         memset(&poll, 0, sizeof(poll));
 
-        poll.fd = _fd;
+        poll.fd = fd();
         poll.events |= events;
 
         int rc;
@@ -277,7 +196,7 @@ public:
         return (rc > 0 && (rc & POLLOUT));
     }
 
-private:
+protected:
 
     /// Construct based on an existing socket fd.
     /// Used by accept() only.
@@ -290,6 +209,111 @@ private:
     const int _fd;
 };
 
+/// A non-blocking, client socket.
+class ClientSocket : public Socket
+{
+public:
+    ClientSocket() :
+        Socket()
+    {
+    }
+
+    /// Connect to a server address.
+    /// Does not retry on error.
+    /// timeoutMs can be 0 to avoid waiting, or -1 to wait forever.
+    /// Returns true on success only.
+    /// Note: when succceeds, caller must check for
+    /// EINPROGRESS and poll for write, then getError(),
+    /// only when the latter returns 0 we are connected.
+    bool connect(const SocketAddress& address, const int timeoutMs = 0)
+    {
+        const int rc = ::connect(fd(), address.addr(), address.length());
+        if (rc == 0)
+        {
+            return true;
+        }
+
+        if (errno != EINPROGRESS)
+        {
+            return false;
+        }
+
+        // Wait for writable, then check again.
+        pollWrite(timeoutMs);
+
+        // Now check if we connected, not, or not yet.
+        return (getError() == 0 || errno == EINPROGRESS);
+    }
+
+    /// Send data to our peer.
+    /// Returns the number of bytes sent, -1 on error.
+    int send(const void* buf, const size_t len)
+    {
+        // Don't SIGPIPE when the other end closes.
+        const int rc = ::send(fd(), buf, len, MSG_NOSIGNAL);
+        return rc;
+    }
+
+    /// Receive data from our peer.
+    /// Returns the number of bytes received, -1 on error,
+    /// and 0 when the peer has performed an orderly shutdown.
+    int recv(void* buf, const size_t len)
+    {
+        const int rc = ::recv(fd(), buf, len, 0);
+        return rc;
+    }
+
+protected:
+    ClientSocket(const int fd) :
+        Socket(fd)
+    {
+    }
+
+    friend class ServerSocket;
+};
+
+/// A non-blocking, streaming socket.
+class ServerSocket : public Socket
+{
+public:
+
+    /// Binds to a local address (Servers only).
+    /// Does not retry on error.
+    /// Returns true on success only.
+    bool bind(const SocketAddress& address)
+    {
+        // Enable address reuse to avoid stalling after
+        // recycling, when previous socket is TIME_WAIT.
+        //TODO: Might be worth refactoring out.
+        const int reuseAddress = 1;
+        constexpr unsigned int len = sizeof(reuseAddress);
+        ::setsockopt(fd(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
+
+        const int rc = ::bind(fd(), address.addr(), address.length());
+        return (rc == 0);
+    }
+
+    /// Listen to incoming connections (Servers only).
+    /// Does not retry on error.
+    /// Returns true on success only.
+    bool listen(const int backlog = 64)
+    {
+        const int rc = ::listen(fd(), backlog);
+        return (rc == 0);
+    }
+
+    /// Accepts an incoming connection (Servers only).
+    /// Does not retry on error.
+    /// Returns a valid Socket shared_ptr on success only.
+    std::shared_ptr<ClientSocket> accept()
+    {
+        // Accept a connection (if any) and set it to non-blocking.
+        // We don't care about the client's address, so ignored.
+        const int rc = ::accept4(fd(), nullptr, nullptr, SOCK_NONBLOCK);
+        return std::shared_ptr<ClientSocket>(rc != -1 ? new ClientSocket(rc) : nullptr);
+    }
+};
+
 /// Handles non-blocking socket event polling.
 /// Only polls on N-Sockets and invokes callback and
 /// doesn't manage buffers or client data.
@@ -373,7 +397,7 @@ public:
 
     /// Insert a new socket to be polled.
     /// Sockets are removed only when the handler return false.
-    void insertNewSocket(const std::shared_ptr<Socket>& newSocket)
+    void insertNewSocket(const std::shared_ptr<ClientSocket>& newSocket)
     {
         std::lock_guard<std::mutex> lock(_mutex);
 
@@ -422,10 +446,10 @@ private:
     /// main-loop wakeup pipe
     int _wakeup[2];
     /// The sockets we're controlling
-    std::vector<std::shared_ptr<Socket>> _pollSockets;
+    std::vector<std::shared_ptr<ClientSocket>> _pollSockets;
     /// Protects _newSockets
     std::mutex _mutex;
-    std::vector<std::shared_ptr<Socket>> _newSockets;
+    std::vector<std::shared_ptr<ClientSocket>> _newSockets;
     /// The fds to poll.
     std::vector<pollfd> _pollFds;
 };
@@ -468,7 +492,7 @@ SocketAddress addr("127.0.0.1", PortNumber);
 
 void client(const int timeoutMs)
 {
-    const auto client = std::make_shared<Socket>();
+    const auto client = std::make_shared<ClientSocket>();
     if (!client->connect(addr, timeoutMs) && errno != EINPROGRESS)
     {
         const std::string msg = "Failed to call connect. (errno: ";
@@ -498,10 +522,10 @@ void client(const int timeoutMs)
     }
 }
 
-void server(SocketPoll<Socket>& poller)
+void server(SocketPoll<ClientSocket>& poller)
 {
     // Start server.
-    auto server = std::make_shared<Socket>();
+    auto server = std::make_shared<ServerSocket>();
     if (!server->bind(addr))
     {
         const std::string msg = "Failed to bind. (errno: ";
@@ -519,7 +543,7 @@ void server(SocketPoll<Socket>& poller)
     {
         if (server->pollRead(30000))
         {
-            std::shared_ptr<Socket> clientSocket = server->accept();
+            std::shared_ptr<ClientSocket> clientSocket = server->accept();
             if (!clientSocket)
             {
                 const std::string msg = "Failed to accept. (errno: ";
@@ -533,11 +557,11 @@ void server(SocketPoll<Socket>& poller)
 }
 
 /// Poll client sockets and do IO.
-void pollAndComm(SocketPoll<Socket>& poller, std::atomic<bool>& stop)
+void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
 {
     while (!stop)
     {
-        poller.poll(5000, [](const std::shared_ptr<Socket>& socket, const int events)
+        poller.poll(5000, [](const std::shared_ptr<ClientSocket>& socket, const int events)
         {
             if (events & POLLIN)
             {
@@ -588,7 +612,7 @@ int main(int argc, const char**)
     }
 
     // Used to poll client sockets.
-    SocketPoll<Socket> poller;
+    SocketPoll<ClientSocket> poller;
 
     // Start the client polling thread.
     Thread threadPoll([&poller](std::atomic<bool>& stop)


More information about the Libreoffice-commits mailing list