[Libreoffice-commits] online.git: 2 commits - loolwsd/LOOLKit.cpp loolwsd/MessageQueue.cpp loolwsd/MessageQueue.hpp
Jan Holesovsky
kendy at collabora.com
Mon Sep 26 02:47:16 UTC 2016
loolwsd/LOOLKit.cpp | 163 ++++++++++++++---------------------------------
loolwsd/MessageQueue.cpp | 87 +++++++++----------------
loolwsd/MessageQueue.hpp | 6 -
3 files changed, 82 insertions(+), 174 deletions(-)
New commits:
commit c97ebfc585f213cb897b7c7e4a948cb1332d6169
Author: Jan Holesovsky <kendy at collabora.com>
Date: Fri Sep 23 10:18:54 2016 +0200
Prioritize the tile requests when extracting from the TileQueue.
When putting tiles into the queue, do only the de-duplication,
reprioritization is better at the get() time - no need to keep shuffling
the priorities according to the cursor moves etc. all the time.
The tile combination then does the rest of the work for us :-)
Change-Id: I100c433dd3b24228d1ca8e4c89891635db1115c1
Reviewed-on: https://gerrit.libreoffice.org/29283
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/MessageQueue.cpp b/loolwsd/MessageQueue.cpp
index 459098f..2f46e10 100644
--- a/loolwsd/MessageQueue.cpp
+++ b/loolwsd/MessageQueue.cpp
@@ -135,8 +135,15 @@ void TileQueue::put_impl(const Payload& value)
return;
}
+ // TODO because we are doing tile combining ourselves in the get_impl(),
+ // we could split the "tilecombine" messages into separate tiles here;
+ // could lead to some improvements in case we are getting subsequent
+ // tilecombines with overlapping, but not completely same areas.
+
if (!_queue.empty())
{
+ // TODO probably we could do the same with the invalidation callbacks
+ // (later one wins).
if (msg.compare(0, 4, "tile") == 0 || msg.compare(0, 10, "tilecombine") == 0)
{
const auto newMsg = msg.substr(0, msg.find(" ver"));
@@ -146,65 +153,17 @@ void TileQueue::put_impl(const Payload& value)
auto& it = _queue[i];
const std::string old(it.data(), it.size());
const auto oldMsg = old.substr(0, old.find(" ver"));
- Log::trace() << "TileQueue #" << i << ": " << old << Log::end;
if (newMsg == oldMsg)
{
- Log::debug() << "Replacing duplicate tile: " << old << " -> " << msg << Log::end;
- _queue[i] = value;
-
- if (priority(msg))
- {
- // Bump to top.
- Log::debug() << "And bumping tile to top: " << msg << Log::end;
- _queue.erase(_queue.begin() + i);
- _queue.push_front(value);
- }
-
- return;
+ Log::debug() << "Remove duplicate message: " << old << " -> " << msg << Log::end;
+ _queue.erase(_queue.begin() + i);
+ break;
}
}
}
}
- if (priority(msg))
- {
- Log::debug() << "Priority tile [" << msg << "]" << Log::end;
- _queue.push_front(value);
- }
- else
- {
- BasicTileQueue::put_impl(value);
- }
-}
-
-/// Bring the underlying tile (if any) to the top.
-/// There should be only one overlapping tile at most.
-void TileQueue::reprioritize(const CursorPosition& cursorPosition)
-{
- for (size_t i = 0; i < _queue.size(); ++i)
- {
- auto it = _queue[i];
- const std::string msg(it.data(), it.size());
- if (msg.compare(0, 5, "tile ") != 0)
- {
- continue;
- }
-
- auto tile = TileDesc::parse(msg); //FIXME: Expensive, avoid.
-
- if (tile.intersectsWithRect(cursorPosition.X, cursorPosition.Y, cursorPosition.Width, cursorPosition.Height))
- {
- if (i != 0)
- {
- // Bump to top.
- Log::trace() << "Bumping tile to top: " << msg << Log::end;
- _queue.erase(_queue.begin() + i);
- _queue.push_front(it);
- }
-
- return;
- }
- }
+ BasicTileQueue::put_impl(value);
}
bool TileQueue::priority(const std::string& tileMsg)
@@ -231,7 +190,6 @@ MessageQueue::Payload TileQueue::get_impl()
{
std::vector<TileDesc> tiles;
const auto front = _queue.front();
- _queue.pop_front();
auto msg = std::string(front.data(), front.size());
Log::trace() << "MessageQueue Get, Size: " << _queue.size() << ", Front: " << msg << Log::end;
@@ -240,9 +198,30 @@ MessageQueue::Payload TileQueue::get_impl()
{
// Don't combine non-tiles or tiles with id.
Log::trace() << "MessageQueue res: " << msg << Log::end;
+ _queue.pop_front();
return front;
}
+ // We are handling a tile; first try to find one that is at the cursor's
+ // position, otherwise handle the one that is at the front
+ bool foundPrioritized = false;
+ for (size_t i = 0; i < _queue.size(); ++i)
+ {
+ auto& it = _queue[i];
+ const std::string prio(it.data(), it.size());
+ if (priority(prio))
+ {
+ Log::debug() << "Handling a priority message: " << prio << Log::end;
+ _queue.erase(_queue.begin() + i);
+ msg = prio;
+ foundPrioritized = true;
+ break;
+ }
+ }
+
+ if (!foundPrioritized)
+ _queue.pop_front();
+
tiles.emplace_back(TileDesc::parse(msg));
// Combine as many tiles as possible with the top one.
@@ -263,7 +242,7 @@ MessageQueue::Payload TileQueue::get_impl()
}
auto tile2 = TileDesc::parse(msg);
- Log::trace() << "combining?: " << msg << Log::end;
+ Log::trace() << "combining candidate: " << msg << Log::end;
// Check if adjacent tiles.
bool found = false;
diff --git a/loolwsd/MessageQueue.hpp b/loolwsd/MessageQueue.hpp
index 4d1efba..89d314e 100644
--- a/loolwsd/MessageQueue.hpp
+++ b/loolwsd/MessageQueue.hpp
@@ -112,8 +112,6 @@ public:
{
_cursorPositions[viewId] = cursorPosition;
}
-
- reprioritize(cursorPosition);
}
void removeCursorPosition(int viewId)
@@ -128,10 +126,6 @@ protected:
private:
- /// Bring the underlying tile (if any) to the top.
- /// There should be only one overlapping tile at most.
- void reprioritize(const CursorPosition& cursorPosition);
-
/// Check if the given tile msg underlies a cursor.
bool priority(const std::string& tileMsg);
commit 61b0ad4933550289017a7f59b436299ff01155af
Author: Jan Holesovsky <kendy at collabora.com>
Date: Thu Sep 22 21:59:12 2016 +0200
Fix a race between the tile rendering and invalidation callbacks.
Instead of 2 queues and 2 threads, merge those to one - which gives a perfect
ordering of the invalidations and rendering.
Change-Id: I229dfc08b43e6ce7e4f08ea8059d3298d9bf8f8a
Reviewed-on: https://gerrit.libreoffice.org/29282
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/LOOLKit.cpp b/loolwsd/LOOLKit.cpp
index 7c9f451..5633536 100644
--- a/loolwsd/LOOLKit.cpp
+++ b/loolwsd/LOOLKit.cpp
@@ -368,26 +368,6 @@ private:
std::atomic<bool> _joined;
};
-/// Worker callback notification object.
-/// Used to pass callback data to the worker
-/// thread to invoke sessions with the data.
-class CallbackNotification : public Notification
-{
-public:
- typedef AutoPtr<CallbackNotification> Ptr;
-
- CallbackNotification(const std::shared_ptr<ChildSession>& session, const int nType, const std::string& rPayload)
- : _session(session),
- _nType(nType),
- _aPayload(rPayload)
- {
- }
-
- const std::shared_ptr<ChildSession> _session;
- const int _nType;
- const std::string _aPayload;
-};
-
/// A document container.
/// Owns LOKitDocument instance and connections.
/// Manages the lifetime of a document.
@@ -430,7 +410,6 @@ public:
_docPasswordType(PasswordType::ToView),
_stop(false),
_isLoading(0),
- _tilesThread(tilesThread, this),
_clientViews(0)
{
Log::info("Document ctor for url [" + _url + "] on child [" + _jailId + "].");
@@ -446,11 +425,9 @@ public:
// Wait for the callback worker to finish.
_stop = true;
- _callbackQueue.wakeUpAll();
- _callbackThread.join();
_tileQueue->put("eof");
- _tilesThread.join();
+ _callbackThread.join();
// Flag all connections to stop.
for (auto aIterator : _connections)
@@ -875,38 +852,7 @@ private:
}
}
- // Forward to the same view only.
- // Demultiplexing is done by Core.
- // TODO: replace with a map to be faster.
- bool isFound = false;
- for (auto& it : pDescr->Doc->_connections)
- {
- auto session = it.second->getSession();
- if (session && session->getViewId() == pDescr->ViewId)
- {
- if (it.second->isRunning())
- {
- isFound = true;
- auto pNotif = new CallbackNotification(session, nType, payload);
- pDescr->Doc->_callbackQueue.enqueueNotification(pNotif);
- }
- else
- {
- Log::error() << "Connection thread for session " << it.second->getSessionId() << " for view "
- << pDescr->ViewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(nType)
- << "] payload [" << payload << "]." << Log::end;
- }
-
- break;
- }
- }
-
- if (!isFound)
- {
- Log::warn() << "Document::ViewCallback. The message [" << pDescr->ViewId
- << "] [" << LOKitHelper::kitCallbackTypeToString(nType)
- << "] [" << payload << "] is not sent to Master Session." << Log::end;
- }
+ pDescr->Doc->_tileQueue->put("callback " + std::to_string(pDescr->ViewId) + " " + std::to_string(nType) + " " + payload);
}
static void DocumentCallback(const int nType, const char* pPayload, void* pData)
@@ -924,18 +870,8 @@ private:
{
std::unique_lock<std::mutex> lock(_mutex);
- for (auto& it: _connections)
- {
- if (it.second->isRunning())
- {
- auto session = it.second->getSession();
- if (session)
- {
- auto pNotif = new CallbackNotification(session, nType, payload);
- _callbackQueue.enqueueNotification(pNotif);
- }
- }
- }
+ // "-1" means broadcast
+ _tileQueue->put("callback -1 " + std::to_string(nType) + " " + payload);
}
/// Load a document (or view) and register callbacks.
@@ -1230,55 +1166,17 @@ private:
void run() override
{
- Util::setThreadName("kit_callback");
-
- Log::debug("Thread started.");
-
- while (!_stop && !TerminationFlag)
- {
- Notification::Ptr aNotification(_callbackQueue.waitDequeueNotification());
- if (!_stop && !TerminationFlag && aNotification)
- {
- CallbackNotification::Ptr aCallbackNotification = aNotification.cast<CallbackNotification>();
- assert(aCallbackNotification);
-
- const auto nType = aCallbackNotification->_nType;
- try
- {
- aCallbackNotification->_session->loKitCallback(nType, aCallbackNotification->_aPayload);
- }
- catch (const Exception& exc)
- {
- Log::error() << "CallbackWorker::run: Exception while handling callback [" << LOKitHelper::kitCallbackTypeToString(nType) << "]: "
- << exc.displayText()
- << (exc.nested() ? " (" + exc.nested()->displayText() + ")" : "")
- << Log::end;
- }
- catch (const std::exception& exc)
- {
- Log::error("CallbackWorker::run: Exception while handling callback [" + LOKitHelper::kitCallbackTypeToString(nType) + "]: " + exc.what());
- }
- }
- else
- break;
- }
-
- Log::debug("Thread finished.");
- }
-
- static void tilesThread(Document* pThis)
- {
- Util::setThreadName("tile_renderer");
+ Util::setThreadName("lok_handler");
Log::debug("Thread started.");
try
{
- while (!pThis->_stop)
+ while (!_stop && !TerminationFlag)
{
- const auto input = pThis->_tileQueue->get();
+ const auto input = _tileQueue->get();
const std::string message(input.data(), input.size());
- StringTokenizer tokens(message, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+ StringTokenizer tokens(message, " ");
if (tokens[0] == "eof")
{
@@ -1288,11 +1186,50 @@ private:
if (tokens[0] == "tile")
{
- pThis->renderTile(tokens, pThis->_ws);
+ renderTile(tokens, _ws);
}
else if (tokens[0] == "tilecombine")
{
- pThis->renderCombinedTiles(tokens, pThis->_ws);
+ renderCombinedTiles(tokens, _ws);
+ }
+ else if (tokens[0] == "callback")
+ {
+ int viewId = std::stoi(tokens[1]); // -1 means broadcast
+ int type = std::stoi(tokens[2]);
+
+ // payload is the rest of the message
+ std::string payload(message.substr(tokens[0].length() + tokens[1].length() + tokens[2].length() + 3));
+
+ // Forward the callback to the same view, demultiplexing is done by the LibreOffice core.
+ // TODO: replace with a map to be faster.
+ bool isFound = false;
+ for (auto& it : _connections)
+ {
+ auto session = it.second->getSession();
+ if (session && ((session->getViewId() == viewId) || (viewId == -1)))
+ {
+ if (it.second->isRunning())
+ {
+ isFound = true;
+ session->loKitCallback(type, payload);
+ }
+ else
+ {
+ Log::error() << "Connection thread for session " << it.second->getSessionId() << " for view "
+ << viewId << " is not running. Dropping [" << LOKitHelper::kitCallbackTypeToString(type)
+ << "] payload [" << payload << "]." << Log::end;
+ }
+
+ break;
+ }
+ }
+
+ if (!isFound)
+ {
+ Log::warn() << "Document::ViewCallback. The message [" << viewId
+ << "] [" << LOKitHelper::kitCallbackTypeToString(type)
+ << "] [" << payload << "] is not sent to Master Session." << Log::end;
+ }
}
else
{
@@ -1336,8 +1273,6 @@ private:
std::map<int, std::unique_ptr<CallbackDescriptor>> _viewIdToCallbackDescr;
std::map<unsigned, std::shared_ptr<Connection>> _connections;
Poco::Thread _callbackThread;
- std::thread _tilesThread;
- Poco::NotificationQueue _callbackQueue;
std::atomic_size_t _clientViews;
};
More information about the Libreoffice-commits
mailing list