[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/Socket.hpp net/WebSocketHandler.hpp wsd/ClientSession.cpp wsd/ClientSession.hpp wsd/LOOLWSD.cpp wsd/SenderQueue.hpp

Michael Meeks michael.meeks at collabora.com
Mon Mar 6 16:28:08 UTC 2017


 net/Socket.hpp           |    5 +---
 net/WebSocketHandler.hpp |   11 ++++++++++
 wsd/ClientSession.cpp    |   49 ++++++++++++++++++++++-------------------------
 wsd/ClientSession.hpp    |    5 ++--
 wsd/LOOLWSD.cpp          |   19 ++++++++++++++++++
 wsd/SenderQueue.hpp      |    8 +++++++
 6 files changed, 66 insertions(+), 31 deletions(-)

New commits:
commit b1e55b92933a44fe18fa694740a650174a393bb7
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Mon Mar 6 16:26:52 2017 +0000

    ClientSession: kill the writing thread.

diff --git a/net/Socket.hpp b/net/Socket.hpp
index 3090297..e3434bb 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -354,10 +354,10 @@ public:
     virtual void handleIncomingMessage() = 0;
 
     /// Is there queued up data that we want to write ?
-    virtual bool hasQueuedWrites() const { return false; }
+    virtual bool hasQueuedWrites() const = 0;
 
     /// Do some of the queued writing.
-    virtual void performWrites() {}
+    virtual void performWrites() = 0;
 
     /// Called when the is disconnected and will be destroyed.
     /// Will be called exactly once.
@@ -393,7 +393,6 @@ public:
 
     int getPollEvents() override
     {
-        std::cerr << "empty ? " << _outBuffer.empty() << " has queued write " << _socketHandler->hasQueuedWrites() << "\n";
         if (!_outBuffer.empty() || _socketHandler->hasQueuedWrites())
             return POLLIN | POLLOUT;
         else
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index feae544..4ea2fb7 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -210,6 +210,17 @@ public:
         _wsPayload.clear();
     }
 
+    bool hasQueuedWrites() const override
+    {
+        LOG_TRC("WebSocket - asked for queued writes");
+        return false;
+    }
+
+    void performWrites() override
+    {
+        assert(false);
+    }
+
     void sendFrame(const std::string& msg) const
     {
         sendMessage(msg.data(), msg.size(), WSOpCode::Text);
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index edcf34c..ec2e8fd 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -39,9 +39,6 @@ ClientSession::ClientSession(const std::string& id,
     _stop(false)
 {
     LOG_INF("ClientSession ctor [" << getName() << "].");
-
-    // FIXME: one thread per client session [!?].
-    _senderThread = std::thread([this]{ senderThread(); });
 }
 
 ClientSession::~ClientSession()
@@ -49,10 +46,6 @@ ClientSession::~ClientSession()
     LOG_INF("~ClientSession dtor [" << getName() << "].");
 
     stop();
-    if (_senderThread.joinable())
-    {
-        _senderThread.join();
-    }
 }
 
 bool ClientSession::_handleInput(const char *buffer, int length)
@@ -434,36 +427,40 @@ void ClientSession::setReadOnly()
     sendTextFrame("perm: readonly");
 }
 
-void ClientSession::senderThread()
+bool ClientSession::hasQueuedWrites() const
 {
-    LOG_DBG(getName() << " SenderThread started");
+    LOG_DBG(getName() << " ClientSession: has queued writes? "
+            << _senderQueue.size());
+    return _senderQueue.size() > 0;
+}
 
-    while (!stopping())
+void ClientSession::performWrites()
+{
+    LOG_DBG(getName() << " ClientSession: performing writes");
+
+    std::shared_ptr<Message> item;
+    if (_senderQueue.waitDequeue(item, 0 /* ms - don't block */))
     {
-        std::shared_ptr<Message> item;
-        if (_senderQueue.waitDequeue(item, static_cast<size_t>(COMMAND_TIMEOUT_MS)))
+        const std::vector<char>& data = item->data();
+        try
         {
-            const std::vector<char>& data = item->data();
-            try
+            if (item->isBinary())
             {
-                if (item->isBinary())
-                {
-                    Session::sendBinaryFrame(data.data(), data.size());
-                }
-                else
-                {
-                    Session::sendTextFrame(data.data(), data.size());
-                }
+                Session::sendBinaryFrame(data.data(), data.size());
             }
-            catch (const std::exception& ex)
+            else
             {
-                LOG_ERR("Failed to send message [" << LOOLProtocol::getAbbreviatedMessage(data) <<
-                        "] to " << getName() << ": " << ex.what());
+                Session::sendTextFrame(data.data(), data.size());
             }
         }
+        catch (const std::exception& ex)
+        {
+            LOG_ERR("Failed to send message [" << LOOLProtocol::getAbbreviatedMessage(data) <<
+                    "] to " << getName() << ": " << ex.what());
+        }
     }
 
-    LOG_DBG(getName() << " SenderThread finished");
+    LOG_DBG(getName() << " ClientSession: performed write");
 }
 
 bool ClientSession::handleKitToClientMessage(const char* buffer, const int length)
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index b2ecc8a..97f5591 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -112,6 +112,9 @@ public:
     /// Set WOPI fileinfo object
     void setWopiFileInfo(std::unique_ptr<WopiStorage::WOPIFileInfo>& wopiFileInfo) { _wopiFileInfo = std::move(wopiFileInfo); }
 
+    bool hasQueuedWrites() const override;
+    void performWrites() override;
+
 private:
     virtual bool _handleInput(const char* buffer, int length) override;
 
@@ -138,8 +141,6 @@ private:
     /// Eg. in readonly mode only few messages should be allowed
     bool filterMessage(const std::string& msg) const;
 
-    void senderThread();
-
 private:
     std::weak_ptr<DocumentBroker> _docBroker;
 
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 51d4430..68bc905 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -1800,6 +1800,25 @@ private:
         }
     }
 
+    bool hasQueuedWrites() const override
+    {
+        // FIXME: - the session should be owning the fd in DocumentBroker's _poll
+        if (_clientSession)
+            return _clientSession->hasQueuedWrites();
+
+        LOG_TRC("ClientRequestDispatcher - asked for queued writes");
+        return false;
+    }
+
+    void performWrites() override
+    {
+        // FIXME: - the session should be owning the fd in DocumentBroker's _poll
+        if (_clientSession)
+            return _clientSession->performWrites();
+
+        assert (false);
+    }
+
     void handleFileServerRequest(const Poco::Net::HTTPRequest& request, Poco::MemoryInputStream& message)
     {
         auto socket = _socket.lock();
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index c2dc60c..26bc8dc 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -26,6 +26,8 @@
 #include "Log.hpp"
 #include "TileDesc.hpp"
 
+#include "Socket.hpp" // FIXME: hack for wakeup-world ...
+
 struct SendItem
 {
     std::weak_ptr<LOOLWebSocket> Socket;
@@ -55,6 +57,7 @@ public:
     size_t enqueue(const Item& item)
     {
         std::unique_lock<std::mutex> lock(_mutex);
+        bool wasEmpty = _queue.empty();
         if (!stopping())
         {
             if (deduplicate(item))
@@ -67,6 +70,11 @@ public:
         lock.unlock();
 
         _cv.notify_one();
+        if (wasEmpty)
+        {
+            // FIXME: Horrible hack - we need to wakeup just our own poll ...
+            SocketPoll::wakeupWorld();
+        }
         return queuesize;
     }
 


More information about the Libreoffice-commits mailing list