[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/Socket.hpp net/WebSocketHandler.hpp wsd/ClientSession.cpp wsd/ClientSession.hpp wsd/LOOLWSD.cpp wsd/SenderQueue.hpp
Michael Meeks
michael.meeks at collabora.com
Mon Mar 6 16:28:08 UTC 2017
net/Socket.hpp | 5 +---
net/WebSocketHandler.hpp | 11 ++++++++++
wsd/ClientSession.cpp | 49 ++++++++++++++++++++++-------------------------
wsd/ClientSession.hpp | 5 ++--
wsd/LOOLWSD.cpp | 19 ++++++++++++++++++
wsd/SenderQueue.hpp | 8 +++++++
6 files changed, 66 insertions(+), 31 deletions(-)
New commits:
commit b1e55b92933a44fe18fa694740a650174a393bb7
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Mon Mar 6 16:26:52 2017 +0000
ClientSession: kill the writing thread.
diff --git a/net/Socket.hpp b/net/Socket.hpp
index 3090297..e3434bb 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -354,10 +354,10 @@ public:
virtual void handleIncomingMessage() = 0;
/// Is there queued up data that we want to write ?
- virtual bool hasQueuedWrites() const { return false; }
+ virtual bool hasQueuedWrites() const = 0;
/// Do some of the queued writing.
- virtual void performWrites() {}
+ virtual void performWrites() = 0;
/// Called when the is disconnected and will be destroyed.
/// Will be called exactly once.
@@ -393,7 +393,6 @@ public:
int getPollEvents() override
{
- std::cerr << "empty ? " << _outBuffer.empty() << " has queued write " << _socketHandler->hasQueuedWrites() << "\n";
if (!_outBuffer.empty() || _socketHandler->hasQueuedWrites())
return POLLIN | POLLOUT;
else
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index feae544..4ea2fb7 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -210,6 +210,17 @@ public:
_wsPayload.clear();
}
+ bool hasQueuedWrites() const override
+ {
+ LOG_TRC("WebSocket - asked for queued writes");
+ return false;
+ }
+
+ void performWrites() override
+ {
+ assert(false);
+ }
+
void sendFrame(const std::string& msg) const
{
sendMessage(msg.data(), msg.size(), WSOpCode::Text);
diff --git a/wsd/ClientSession.cpp b/wsd/ClientSession.cpp
index edcf34c..ec2e8fd 100644
--- a/wsd/ClientSession.cpp
+++ b/wsd/ClientSession.cpp
@@ -39,9 +39,6 @@ ClientSession::ClientSession(const std::string& id,
_stop(false)
{
LOG_INF("ClientSession ctor [" << getName() << "].");
-
- // FIXME: one thread per client session [!?].
- _senderThread = std::thread([this]{ senderThread(); });
}
ClientSession::~ClientSession()
@@ -49,10 +46,6 @@ ClientSession::~ClientSession()
LOG_INF("~ClientSession dtor [" << getName() << "].");
stop();
- if (_senderThread.joinable())
- {
- _senderThread.join();
- }
}
bool ClientSession::_handleInput(const char *buffer, int length)
@@ -434,36 +427,40 @@ void ClientSession::setReadOnly()
sendTextFrame("perm: readonly");
}
-void ClientSession::senderThread()
+bool ClientSession::hasQueuedWrites() const
{
- LOG_DBG(getName() << " SenderThread started");
+ LOG_DBG(getName() << " ClientSession: has queued writes? "
+ << _senderQueue.size());
+ return _senderQueue.size() > 0;
+}
- while (!stopping())
+void ClientSession::performWrites()
+{
+ LOG_DBG(getName() << " ClientSession: performing writes");
+
+ std::shared_ptr<Message> item;
+ if (_senderQueue.waitDequeue(item, 0 /* ms - don't block */))
{
- std::shared_ptr<Message> item;
- if (_senderQueue.waitDequeue(item, static_cast<size_t>(COMMAND_TIMEOUT_MS)))
+ const std::vector<char>& data = item->data();
+ try
{
- const std::vector<char>& data = item->data();
- try
+ if (item->isBinary())
{
- if (item->isBinary())
- {
- Session::sendBinaryFrame(data.data(), data.size());
- }
- else
- {
- Session::sendTextFrame(data.data(), data.size());
- }
+ Session::sendBinaryFrame(data.data(), data.size());
}
- catch (const std::exception& ex)
+ else
{
- LOG_ERR("Failed to send message [" << LOOLProtocol::getAbbreviatedMessage(data) <<
- "] to " << getName() << ": " << ex.what());
+ Session::sendTextFrame(data.data(), data.size());
}
}
+ catch (const std::exception& ex)
+ {
+ LOG_ERR("Failed to send message [" << LOOLProtocol::getAbbreviatedMessage(data) <<
+ "] to " << getName() << ": " << ex.what());
+ }
}
- LOG_DBG(getName() << " SenderThread finished");
+ LOG_DBG(getName() << " ClientSession: performed write");
}
bool ClientSession::handleKitToClientMessage(const char* buffer, const int length)
diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp
index b2ecc8a..97f5591 100644
--- a/wsd/ClientSession.hpp
+++ b/wsd/ClientSession.hpp
@@ -112,6 +112,9 @@ public:
/// Set WOPI fileinfo object
void setWopiFileInfo(std::unique_ptr<WopiStorage::WOPIFileInfo>& wopiFileInfo) { _wopiFileInfo = std::move(wopiFileInfo); }
+ bool hasQueuedWrites() const override;
+ void performWrites() override;
+
private:
virtual bool _handleInput(const char* buffer, int length) override;
@@ -138,8 +141,6 @@ private:
/// Eg. in readonly mode only few messages should be allowed
bool filterMessage(const std::string& msg) const;
- void senderThread();
-
private:
std::weak_ptr<DocumentBroker> _docBroker;
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index 51d4430..68bc905 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -1800,6 +1800,25 @@ private:
}
}
+ bool hasQueuedWrites() const override
+ {
+ // FIXME: - the session should be owning the fd in DocumentBroker's _poll
+ if (_clientSession)
+ return _clientSession->hasQueuedWrites();
+
+ LOG_TRC("ClientRequestDispatcher - asked for queued writes");
+ return false;
+ }
+
+ void performWrites() override
+ {
+ // FIXME: - the session should be owning the fd in DocumentBroker's _poll
+ if (_clientSession)
+ return _clientSession->performWrites();
+
+ assert (false);
+ }
+
void handleFileServerRequest(const Poco::Net::HTTPRequest& request, Poco::MemoryInputStream& message)
{
auto socket = _socket.lock();
diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp
index c2dc60c..26bc8dc 100644
--- a/wsd/SenderQueue.hpp
+++ b/wsd/SenderQueue.hpp
@@ -26,6 +26,8 @@
#include "Log.hpp"
#include "TileDesc.hpp"
+#include "Socket.hpp" // FIXME: hack for wakeup-world ...
+
struct SendItem
{
std::weak_ptr<LOOLWebSocket> Socket;
@@ -55,6 +57,7 @@ public:
size_t enqueue(const Item& item)
{
std::unique_lock<std::mutex> lock(_mutex);
+ bool wasEmpty = _queue.empty();
if (!stopping())
{
if (deduplicate(item))
@@ -67,6 +70,11 @@ public:
lock.unlock();
_cv.notify_one();
+ if (wasEmpty)
+ {
+ // FIXME: Horrible hack - we need to wakeup just our own poll ...
+ SocketPoll::wakeupWorld();
+ }
return queuesize;
}
More information about the Libreoffice-commits
mailing list