[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/clientnb.cpp net/loolnb.cpp net/socket.hpp

Michael Meeks michael.meeks at collabora.com
Tue Feb 14 23:47:49 UTC 2017


 net/clientnb.cpp |    3 -
 net/loolnb.cpp   |  125 +++++++++++++++++++------------------------------------
 net/socket.hpp   |   55 ++++++++++++++++++------
 3 files changed, 88 insertions(+), 95 deletions(-)

New commits:
commit ded9607faf8e3e1fa122b200353a3465c94347ae
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Tue Feb 14 23:45:24 2017 +0000

    Implement basic buffering.
    
    The socket now buffers input, and output, updates its poll record too.
    We pass a simple message from client to server and back using lamers HTTP.
    Sub-classed ClientSocket to provide a simple message handler.
       not very convinced by templatization here, but made it consistent.
       more ideal to have some virtual socket pieces.

diff --git a/net/clientnb.cpp b/net/clientnb.cpp
index e4a7fc3..ec7c578 100644
--- a/net/clientnb.cpp
+++ b/net/clientnb.cpp
@@ -82,7 +82,8 @@ public:
             std::cerr << "try to get response\n";
             std::istream& responseStream = session->receiveResponse(response);
 
-            std::cerr << "Got response '" << responseStream << "'\n";
+            std::string result(std::istreambuf_iterator<char>(responseStream), {});
+            std::cerr << "Got response '" << result << "'\n";
         }
         catch (const Poco::Exception &e)
         {
diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index 952c44a..d457471 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -24,6 +24,34 @@
 
 constexpr int PortNumber = 9191;
 
+class SimpleResponseClient : public ClientSocket
+{
+public:
+    SimpleResponseClient(const int fd) :
+        ClientSocket(fd)
+    {
+    }
+    virtual void handleIncomingMessage() override
+    {
+        std::cerr << "message had size " << _inBuffer.size() << "\n";
+        std::ostringstream oss;
+        oss << "HTTP/1.1 200 OK\r\n"
+            << "Date: Once, Upon a time GMT\r\n" // Mon, 27 Jul 2009 12:28:53 GMT
+            << "Server: madeup string (Linux)\r\n"
+            << "Content-Length: " << _inBuffer.size() << "\r\n"
+            << "Content-Type: text/plain\r\n"
+            << "Connection: Closed\r\n"
+            << "\r\n"
+            ;
+        std::string str = oss.str();
+        _outBuffer.insert(_outBuffer.end(), str.begin(), str.end());
+
+        // append the content we got:
+        _outBuffer.insert(_outBuffer.end(), _inBuffer.begin(), _inBuffer.end());
+        _inBuffer.clear();
+    }
+};
+
 /// Handles non-blocking socket event polling.
 /// Only polls on N-Sockets and invokes callback and
 /// doesn't manage buffers or client data.
@@ -102,7 +130,7 @@ public:
 
     /// Insert a new socket to be polled.
     /// Sockets are removed only when the handler return false.
-    void insertNewSocket(const std::shared_ptr<ClientSocket>& newSocket)
+    void insertNewSocket(const std::shared_ptr<T>& newSocket)
     {
         std::lock_guard<std::mutex> lock(_mutex);
 
@@ -138,8 +166,7 @@ private:
         for (size_t i = 0; i < size; ++i)
         {
             _pollFds[i].fd = _pollSockets[i]->getFD();
-            //TODO: Get from the socket.
-            _pollFds[i].events = POLLIN | POLLOUT;
+            _pollFds[i].events = _pollSockets[i]->getPollEvents();
             _pollFds[i].revents = 0;
         }
 
@@ -153,10 +180,10 @@ private:
     /// main-loop wakeup pipe
     int _wakeup[2];
     /// The sockets we're controlling
-    std::vector<std::shared_ptr<ClientSocket>> _pollSockets;
+    std::vector<std::shared_ptr<T>> _pollSockets;
     /// Protects _newSockets
     std::mutex _mutex;
-    std::vector<std::shared_ptr<ClientSocket>> _newSockets;
+    std::vector<std::shared_ptr<T>> _newSockets;
     /// The fds to poll.
     std::vector<pollfd> _pollFds;
 };
@@ -197,39 +224,7 @@ private:
 
 Poco::Net::SocketAddress addr("127.0.0.1", PortNumber);
 
-void client(const int timeoutMs)
-{
-    const auto client = std::make_shared<ClientSocket>();
-    if (!client->connect(addr, timeoutMs) && errno != EINPROGRESS)
-    {
-        const std::string msg = "Failed to call connect. (errno: ";
-        throw std::runtime_error(msg + std::strerror(errno) + ")");
-    }
-
-    std::cout << "Connected " << client->getFD() << std::endl;
-
-    client->send("1", 1);
-    int sent = 1;
-    while (sent > 0 && client->pollRead(5000))
-    {
-        char buf[1024];
-        const int recv = client->recv(buf, sizeof(buf));
-        if (recv <= 0)
-        {
-            perror("recv");
-            break;
-        }
-        else
-        {
-            const std::string msg = std::string(buf, recv);
-            const int num = stoi(msg);
-            const std::string new_msg = std::to_string(num + 1);
-            sent = client->send(new_msg.data(), new_msg.size());
-        }
-    }
-}
-
-void server(SocketPoll<ClientSocket>& poller)
+void server(SocketPoll<SimpleResponseClient>& poller)
 {
     // Start server.
     auto server = std::make_shared<ServerSocket>();
@@ -250,7 +245,7 @@ void server(SocketPoll<ClientSocket>& poller)
     {
         if (server->pollRead(30000))
         {
-            std::shared_ptr<ClientSocket> clientSocket = server->accept();
+            std::shared_ptr<SimpleResponseClient> clientSocket = server->accept<SimpleResponseClient>();
             if (!clientSocket)
             {
                 const std::string msg = "Failed to accept. (errno: ";
@@ -264,44 +259,21 @@ void server(SocketPoll<ClientSocket>& poller)
 }
 
 /// Poll client sockets and do IO.
-void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
+void pollAndComm(SocketPoll<SimpleResponseClient>& poller, std::atomic<bool>& stop)
 {
     while (!stop)
     {
-        poller.poll(5000, [](const std::shared_ptr<ClientSocket>& socket, const int events)
+        poller.poll(5000, [](const std::shared_ptr<SimpleResponseClient>& socket, const int events)
         {
             if (events & POLLIN)
-            {
-                char buf[1024];
-                const int recv = socket->recv(buf, sizeof(buf));
-                if (recv <= 0)
-                {
-                    perror("recv");
-                    return false;
-                }
+                socket->readIncomingData();
 
-                if (events & POLLOUT)
-                {
-                    const std::string msg = std::string(buf, recv);
-                    const int num = stoi(msg);
-                    if ((num % (1<<16)) == 1)
-                    {
-                        std::cout << "Client #" << socket->getFD() << ": " << msg << std::endl;
-                    }
-                    const std::string new_msg = std::to_string(num + 1);
-                    const int sent = socket->send(new_msg.data(), new_msg.size());
-                    if (sent != static_cast<int>(new_msg.size()))
-                    {
-                        perror("send");
-                        return false;
-                    }
-                }
-                else
-                {
-                    // Normally we'd buffer the response, but for now...
-                    std::cerr << "Client #" << socket->getFD()
-                            << ": ERROR - socket not ready for write." << std::endl;
-                }
+            if (events & POLLOUT)
+                socket->writeOutgoingData();
+
+            if (events & (POLLHUP | POLLERR | POLLNVAL))
+            {
+                // FIXME - close and remove the socket ...
             }
 
             return true;
@@ -309,17 +281,10 @@ void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
     }
 }
 
-int main(int argc, const char**)
+int main(int, const char**)
 {
-    if (argc > 1)
-    {
-        // We are now the client application.
-        client(0);
-        return 0;
-    }
-
     // Used to poll client sockets.
-    SocketPoll<ClientSocket> poller;
+    SocketPoll<SimpleResponseClient> poller;
 
     // Start the client polling thread.
     Thread threadPoll([&poller](std::atomic<bool>& stop)
diff --git a/net/socket.hpp b/net/socket.hpp
index 83202b4..55ca6b7 100644
--- a/net/socket.hpp
+++ b/net/socket.hpp
@@ -223,25 +223,51 @@ public:
         // Now check if we connected, not, or not yet.
         return (getError() == 0 || errno == EINPROGRESS);
     }
+  protected:
+    std::vector< unsigned char > _inBuffer;
+    std::vector< unsigned char > _outBuffer;
+  public:
+    void readIncomingData()
+    {
+        ssize_t len;
+        unsigned char buf[4096];
+        do {
+            len = ::read(getFD(), buf, sizeof(buf));
+        } while (len < 0 && errno == EINTR);
+        if (len > 0)
+        {
+            assert (len < ssize_t(sizeof(buf)));
+            _inBuffer.insert(_inBuffer.end(), &buf[0], &buf[len]);
+            handleIncomingMessage();
+        }
+        // else poll will handle errors.
+    }
 
-    /// Send data to our peer.
-    /// Returns the number of bytes sent, -1 on error.
-    int send(const void* buf, const size_t len)
+    void writeOutgoingData()
     {
-        // Don't SIGPIPE when the other end closes.
-        const int rc = ::send(getFD(), buf, len, MSG_NOSIGNAL);
-        return rc;
+        assert (_outBuffer.size() > 0);
+        ssize_t len;
+        do {
+            len = ::write(getFD(), &_outBuffer[0], _outBuffer.size());
+        } while (len < 0 && errno == EINTR);
+        if (len > 0)
+        {
+            _outBuffer.erase(_outBuffer.begin(),
+                             _outBuffer.begin() + len);
+        }
+        // else poll will handle errors
     }
 
-    /// Receive data from our peer.
-    /// Returns the number of bytes received, -1 on error,
-    /// and 0 when the peer has performed an orderly shutdown.
-    int recv(void* buf, const size_t len)
+    int getPollEvents()
     {
-        const int rc = ::recv(getFD(), buf, len, 0);
-        return rc;
+        int pollFor = POLLIN | POLLPRI;
+        if (_outBuffer.size() > 0)
+            pollFor |= POLLOUT;
+        return pollFor;
     }
 
+    virtual void handleIncomingMessage() = 0;
+
 protected:
     ClientSocket(const int fd) :
         Socket(fd)
@@ -284,12 +310,13 @@ public:
     /// Accepts an incoming connection (Servers only).
     /// Does not retry on error.
     /// Returns a valid Socket shared_ptr on success only.
-    std::shared_ptr<ClientSocket> accept()
+    template <typename T>
+       std::shared_ptr<T> accept()
     {
         // Accept a connection (if any) and set it to non-blocking.
         // We don't care about the client's address, so ignored.
         const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK);
-        return std::shared_ptr<ClientSocket>(rc != -1 ? new ClientSocket(rc) : nullptr);
+        return std::shared_ptr<T>(rc != -1 ? new T(rc) : nullptr);
     }
 };
 


More information about the Libreoffice-commits mailing list