[Libreoffice-commits] online.git: common/Common.hpp common/IoUtil.cpp common/LOOLWebSocket.hpp configure.ac kit/ForKit.cpp kit/Kit.cpp net/DelaySocket.cpp net/ServerSocket.hpp net/Socket.cpp net/Socket.hpp net/SslSocket.hpp net/WebSocketHandler.hpp test/helpers.hpp test/UnitWOPIDocumentConflict.cpp tools/WebSocketDump.cpp wsd/Admin.cpp wsd/Admin.hpp wsd/DocumentBroker.cpp wsd/LOOLWSD.cpp
Michael Meeks (via logerrit)
logerrit at kemper.freedesktop.org
Fri Apr 10 08:06:42 UTC 2020
common/Common.hpp | 2
common/IoUtil.cpp | 4 -
common/LOOLWebSocket.hpp | 2
configure.ac | 2
kit/ForKit.cpp | 2
kit/Kit.cpp | 15 +---
net/DelaySocket.cpp | 14 +--
net/ServerSocket.hpp | 4 -
net/Socket.cpp | 140 ++++++++++++++++++++++++++++++++++++-
net/Socket.hpp | 142 +++-----------------------------------
net/SslSocket.hpp | 6 -
net/WebSocketHandler.hpp | 24 +++---
test/UnitWOPIDocumentConflict.cpp | 2
test/helpers.hpp | 4 -
tools/WebSocketDump.cpp | 6 -
wsd/Admin.cpp | 38 +++-------
wsd/Admin.hpp | 9 +-
wsd/DocumentBroker.cpp | 12 +--
wsd/LOOLWSD.cpp | 16 ++--
19 files changed, 224 insertions(+), 220 deletions(-)
New commits:
commit 5710c8632383e92372e1d81b6e26acc975e25ec4
Author: Michael Meeks <michael.meeks at collabora.com>
AuthorDate: Thu Apr 9 14:43:51 2020 +0100
Commit: Michael Meeks <michael.meeks at collabora.com>
CommitDate: Fri Apr 10 10:06:23 2020 +0200
Poll - switch to ppoll for closer to microsecond accuracy.
Change-Id: Ib8a2bb6f60302df8631edadbbb8db626894c457c
Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92000
Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice at gmail.com>
Reviewed-by: Michael Meeks <michael.meeks at collabora.com>
diff --git a/common/Common.hpp b/common/Common.hpp
index 82f848579..ab9989e73 100644
--- a/common/Common.hpp
+++ b/common/Common.hpp
@@ -18,7 +18,7 @@ constexpr int DEFAULT_CLIENT_PORT_NUMBER = 9980;
constexpr int COMMAND_TIMEOUT_MS = 5000;
constexpr int CHILD_TIMEOUT_MS = COMMAND_TIMEOUT_MS;
constexpr int CHILD_REBALANCE_INTERVAL_MS = CHILD_TIMEOUT_MS / 10;
-constexpr int POLL_TIMEOUT_MS = COMMAND_TIMEOUT_MS / 5;
+constexpr int POLL_TIMEOUT_MICRO_S = (COMMAND_TIMEOUT_MS / 5) * 1000;
constexpr int WS_SEND_TIMEOUT_MS = 1000;
constexpr int TILE_ROUNDTRIP_TIMEOUT_MS = 5000;
diff --git a/common/IoUtil.cpp b/common/IoUtil.cpp
index 209d0f5d3..f7ea25225 100644
--- a/common/IoUtil.cpp
+++ b/common/IoUtil.cpp
@@ -49,7 +49,7 @@ void SocketProcessor(const std::shared_ptr<LOOLWebSocket>& ws,
LOG_INF("SocketProcessor [" << name << "] starting.");
// Timeout given is in microseconds.
- static const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000);
+ static const Poco::Timespan waitTime(POLL_TIMEOUT_MICRO_S);
int flags = 0;
int n = -1;
bool stop = false;
@@ -244,7 +244,7 @@ int PipeReader::readLine(std::string& line,
// Poll in short intervals to check for stop condition.
const int pollTimeoutMs = 500;
- int maxPollCount = std::max<int>(POLL_TIMEOUT_MS / pollTimeoutMs, 1);
+ int maxPollCount = std::max<int>((POLL_TIMEOUT_MICRO_S / 1000) / pollTimeoutMs, 1);
while (maxPollCount-- > 0)
{
if (stopPredicate())
diff --git a/common/LOOLWebSocket.hpp b/common/LOOLWebSocket.hpp
index 6d3bd42cf..2c0559593 100644
--- a/common/LOOLWebSocket.hpp
+++ b/common/LOOLWebSocket.hpp
@@ -53,7 +53,7 @@ public:
int receiveFrame(char* buffer, const int length, int& flags)
{
// Timeout is in microseconds. We don't need this, except to yield the cpu.
- static const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000 / 10);
+ static const Poco::Timespan waitTime(POLL_TIMEOUT_MICRO_S / 10);
static const Poco::Timespan waitZero(0);
while (poll(waitTime, Poco::Net::Socket::SELECT_READ))
diff --git a/configure.ac b/configure.ac
index 1edd5408f..7d795bce8 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1031,6 +1031,8 @@ AS_IF([test "$ENABLE_IOSAPP" = "true"],
])
AC_SUBST(IOSAPP_FONTS)
+AC_CHECK_FUNCS(ppoll)
+
ENABLE_CYPRESS=false
if test "$enable_cypress" = "yes"; then
cypress_msg="cypress is enabled"
diff --git a/kit/ForKit.cpp b/kit/ForKit.cpp
index bea72f4d5..5506fa339 100644
--- a/kit/ForKit.cpp
+++ b/kit/ForKit.cpp
@@ -606,7 +606,7 @@ int main(int argc, char** argv)
{
UnitKit::get().invokeForKitTest();
- mainPoll.poll(POLL_TIMEOUT_MS);
+ mainPoll.ppoll(POLL_TIMEOUT_MICRO_S);
#if ENABLE_DEBUG
if (!SingleKit)
diff --git a/kit/Kit.cpp b/kit/Kit.cpp
index e99ec8e61..acec37bb9 100644
--- a/kit/Kit.cpp
+++ b/kit/Kit.cpp
@@ -2245,7 +2245,7 @@ public:
// a LOK compatible poll function merging the functions.
// returns the number of events signalled
- int kitPoll(int timeoutUs)
+ int kitPoll(int timeoutMicroS)
{
if (SigUtil::getTerminationFlag())
{
@@ -2257,30 +2257,29 @@ public:
int maxExtraEvents = 15;
int eventsSignalled = 0;
- int timeoutMs = timeoutUs / 1000;
- if (timeoutMs < 0)
+ if (timeoutMicroS < 0)
{
// Flush at most 1 + maxExtraEvents, or return when nothing left.
- while (poll(0) > 0 && maxExtraEvents-- > 0)
+ while (ppoll(0) > 0 && maxExtraEvents-- > 0)
++eventsSignalled;
}
else
{
// Flush at most maxEvents+1, or return when nothing left.
- _pollEnd = std::chrono::steady_clock::now() + std::chrono::microseconds(timeoutUs);
+ _pollEnd = std::chrono::steady_clock::now() + std::chrono::microseconds(timeoutMicroS);
do
{
- if (poll(timeoutMs) <= 0)
+ if (ppoll(timeoutMicroS) <= 0)
break;
const auto now = std::chrono::steady_clock::now();
drainQueue(now);
- timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(_pollEnd - now).count();
+ timeoutMicroS = std::chrono::duration_cast<std::chrono::microseconds>(_pollEnd - now).count();
++eventsSignalled;
}
- while (timeoutMs > 0 && !SigUtil::getTerminationFlag() && maxExtraEvents-- > 0);
+ while (timeoutMicroS > 0 && !SigUtil::getTerminationFlag() && maxExtraEvents-- > 0);
}
drainQueue(std::chrono::steady_clock::now());
diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index cfd8b7e54..b07b46cc4 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -77,17 +77,17 @@ public:
// FIXME - really need to propagate 'noDelay' etc.
// have a debug only lookup of delayed sockets for this case ?
- int getPollEvents(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS) override
{
if (_chunks.size() > 0)
{
- int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
+ int64_t remainingMicroS = std::chrono::duration_cast<std::chrono::microseconds>(
(*_chunks.begin())->getSendTime() - now).count();
- if (remainingMs < timeoutMaxMs)
- DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMs
- << "ms from " << timeoutMaxMs << "ms\n");
- timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
+ if (remainingMicroS < timeoutMaxMicroS)
+ DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMicroS
+ << "us from " << timeoutMaxMicroS << "us\n");
+ timeoutMaxMicroS = std::min(timeoutMaxMicroS, remainingMicroS);
}
if (_chunks.size() > 0 &&
diff --git a/net/ServerSocket.hpp b/net/ServerSocket.hpp
index 1017aeb5e..65f826913 100644
--- a/net/ServerSocket.hpp
+++ b/net/ServerSocket.hpp
@@ -65,8 +65,8 @@ public:
/// Returns a valid Socket shared_ptr on success only.
virtual std::shared_ptr<Socket> accept();
- int getPollEvents(std::chrono::steady_clock::time_point /* now */,
- int & /* timeoutMaxMs */) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point /* now */,
+ int64_t & /* timeoutMaxMicroS */) override
{
return POLLIN;
}
diff --git a/net/Socket.cpp b/net/Socket.cpp
index 5bb1fa250..47ae1ad41 100644
--- a/net/Socket.cpp
+++ b/net/Socket.cpp
@@ -35,7 +35,7 @@
#endif
#include "WebSocketHandler.hpp"
-int SocketPoll::DefaultPollTimeoutMs = 5000;
+int SocketPoll::DefaultPollTimeoutMicroS = 5000 * 1000;
std::atomic<bool> SocketPoll::InhibitThreadChecks(false);
std::atomic<bool> Socket::InhibitThreadChecks(false);
@@ -194,6 +194,136 @@ void SocketPoll::pollingThreadEntry()
LOG_INF("Finished polling thread [" << _name << "].");
}
+int SocketPoll::ppoll(int64_t timeoutMaxMicroS)
+{
+ if (_runOnClientThread)
+ checkAndReThread();
+ else
+ assertCorrectThread();
+
+ std::chrono::steady_clock::time_point now =
+ std::chrono::steady_clock::now();
+
+ // The events to poll on change each spin of the loop.
+ psetupPollFds(now, timeoutMaxMicroS);
+ const size_t size = _pollSockets.size();
+
+ int rc;
+ do
+ {
+#if !MOBILEAPP
+# if HAVE_PPOLL
+ LOG_TRC("ppoll start, timeoutMicroS: " << timeoutMaxMicroS << " size " << size);
+ timeoutMaxMicroS = std::max(timeoutMaxMicroS, (int64_t)0);
+ struct timespec timeout;
+ timeout.tv_sec = timeoutMaxMicroS / (1000 * 1000);
+ timeout.tv_nsec = (timeoutMaxMicroS % (1000 * 1000)) * 1000;
+ rc = ::ppoll(&_pollFds[0], size + 1, &timeout, nullptr);
+ LOG_TRC("ppoll result " << rc << " errno " << strerror(errno));
+# else
+ int timeoutMaxMs = (timeoutMaxMicroS + 9999) / 1000;
+ LOG_TRC("Legacy Poll start, timeoutMs: " << timeoutMaxMs);
+ rc = ::poll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0));
+# endif
+#else
+ LOG_TRC("SocketPoll Poll");
+ int timeoutMaxMs = (timeoutMaxMicroS + 9999) / 1000;
+ rc = fakeSocketPoll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0));
+#endif
+ }
+ while (rc < 0 && errno == EINTR);
+ LOG_TRC("Poll completed with " << rc << " live polls max (" <<
+ timeoutMaxMicroS << "us)" << ((rc==0) ? "(timedout)" : ""));
+
+ // First process the wakeup pipe (always the last entry).
+ if (_pollFds[size].revents)
+ {
+ std::vector<CallbackFn> invoke;
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ // Clear the data.
+#if !MOBILEAPP
+ int dump = ::read(_wakeup[0], &dump, sizeof(dump));
+#else
+ LOG_TRC("Wakeup pipe read");
+ int dump = fakeSocketRead(_wakeup[0], &dump, sizeof(dump));
+#endif
+ // Copy the new sockets over and clear.
+ _pollSockets.insert(_pollSockets.end(),
+ _newSockets.begin(), _newSockets.end());
+
+ // Update thread ownership.
+ for (auto &i : _newSockets)
+ i->setThreadOwner(std::this_thread::get_id());
+
+ _newSockets.clear();
+
+ // Extract list of callbacks to process
+ std::swap(_newCallbacks, invoke);
+ }
+
+ for (const auto& callback : invoke)
+ {
+ try
+ {
+ callback();
+ }
+ catch (const std::exception& exc)
+ {
+ LOG_ERR("Exception while invoking poll [" << _name <<
+ "] callback: " << exc.what());
+ }
+ }
+
+ try
+ {
+ wakeupHook();
+ }
+ catch (const std::exception& exc)
+ {
+ LOG_ERR("Exception while invoking poll [" << _name <<
+ "] wakeup hook: " << exc.what());
+ }
+ }
+
+ // This should only happen when we're stopping.
+ if (_pollSockets.size() != size)
+ return rc;
+
+ // Fire the poll callbacks and remove dead fds.
+ std::chrono::steady_clock::time_point newNow =
+ std::chrono::steady_clock::now();
+
+ for (int i = static_cast<int>(size) - 1; i >= 0; --i)
+ {
+ SocketDisposition disposition(_pollSockets[i]);
+ try
+ {
+ _pollSockets[i]->handlePoll(disposition, newNow,
+ _pollFds[i].revents);
+ }
+ catch (const std::exception& exc)
+ {
+ LOG_ERR("Error while handling poll for socket #" <<
+ _pollFds[i].fd << " in " << _name << ": " << exc.what());
+ disposition.setClosed();
+ rc = -1;
+ }
+
+ if (disposition.isMove() || disposition.isClosed())
+ {
+ LOG_DBG("Removing socket #" << _pollFds[i].fd << " (of " <<
+ _pollSockets.size() << ") from " << _name);
+ _pollSockets.erase(_pollSockets.begin() + i);
+ }
+
+ disposition.execute();
+ }
+
+ return rc;
+}
+
void SocketPoll::wakeupWorld()
{
for (const auto& fd : getWakeupsArray())
@@ -384,8 +514,8 @@ void SocketDisposition::execute()
_socketMove = nullptr;
}
-const int WebSocketHandler::InitialPingDelayMs = 25;
-const int WebSocketHandler::PingFrequencyMs = 18 * 1000;
+const int WebSocketHandler::InitialPingDelayMicroS = 25 * 1000;
+const int WebSocketHandler::PingFrequencyMicroS = 18 * 1000 * 1000;
void WebSocketHandler::dumpState(std::ostream& os)
{
@@ -398,8 +528,8 @@ void WebSocketHandler::dumpState(std::ostream& os)
void StreamSocket::dumpState(std::ostream& os)
{
- int timeoutMaxMs = SocketPoll::DefaultPollTimeoutMs;
- int events = getPollEvents(std::chrono::steady_clock::now(), timeoutMaxMs);
+ int64_t timeoutMaxMicroS = SocketPoll::DefaultPollTimeoutMicroS;
+ int events = pgetPollEvents(std::chrono::steady_clock::now(), timeoutMaxMicroS);
os << "\t" << getFD() << "\t" << events << "\t"
<< _inBuffer.size() << "\t" << _outBuffer.size() << "\t"
<< " r: " << _bytesRecvd << "\t w: " << _bytesSent << "\t"
diff --git a/net/Socket.hpp b/net/Socket.hpp
index ab56e5d10..852012424 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -154,8 +154,8 @@ public:
/// Prepare our poll record; adjust @timeoutMaxMs downwards
/// for timeouts, based on current time @now.
/// @returns POLLIN and POLLOUT if output is expected.
- virtual int getPollEvents(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs) = 0;
+ virtual int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS) = 0;
/// Handle results of events returned from poll
virtual void handlePoll(SocketDisposition &disposition,
@@ -370,8 +370,8 @@ public:
/// Prepare our poll record; adjust @timeoutMaxMs downwards
/// for timeouts, based on current time @now.
/// @returns POLLIN and POLLOUT if output is expected.
- virtual int getPollEvents(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs) = 0;
+ virtual int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS) = 0;
/// Do we need to handle a timeout ?
virtual void checkTimeout(std::chrono::steady_clock::time_point /* now */) {}
@@ -476,7 +476,7 @@ public:
~SocketPoll();
/// Default poll time - useful to increase for debugging.
- static int DefaultPollTimeoutMs;
+ static int DefaultPollTimeoutMicroS;
static std::atomic<bool> InhibitThreadChecks;
/// Stop the polling thread.
@@ -533,7 +533,7 @@ public:
{
while (continuePolling())
{
- poll(DefaultPollTimeoutMs);
+ ppoll(DefaultPollTimeoutMicroS);
}
}
@@ -576,123 +576,7 @@ public:
/// Poll the sockets for available data to read or buffer to write.
/// Returns the return-value of poll(2): 0 on timeout,
/// -1 for error, and otherwise the number of events signalled.
- int poll(int timeoutMaxMs)
- {
- if (_runOnClientThread)
- checkAndReThread();
- else
- assertCorrectThread();
-
- std::chrono::steady_clock::time_point now =
- std::chrono::steady_clock::now();
-
- // The events to poll on change each spin of the loop.
- setupPollFds(now, timeoutMaxMs);
- const size_t size = _pollSockets.size();
-
- int rc;
- do
- {
-#if !MOBILEAPP
- LOG_TRC("Poll start, timeoutMs: " << timeoutMaxMs);
- rc = ::poll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0));
-#else
- LOG_TRC("SocketPoll Poll");
- rc = fakeSocketPoll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0));
-#endif
- }
- while (rc < 0 && errno == EINTR);
- LOG_TRC("Poll completed with " << rc << " live polls max (" <<
- timeoutMaxMs << "ms)" << ((rc==0) ? "(timedout)" : ""));
-
- // First process the wakeup pipe (always the last entry).
- if (_pollFds[size].revents)
- {
- std::vector<CallbackFn> invoke;
- {
- std::lock_guard<std::mutex> lock(_mutex);
-
- // Clear the data.
-#if !MOBILEAPP
- int dump = ::read(_wakeup[0], &dump, sizeof(dump));
-#else
- LOG_TRC("Wakeup pipe read");
- int dump = fakeSocketRead(_wakeup[0], &dump, sizeof(dump));
-#endif
- // Copy the new sockets over and clear.
- _pollSockets.insert(_pollSockets.end(),
- _newSockets.begin(), _newSockets.end());
-
- // Update thread ownership.
- for (auto &i : _newSockets)
- i->setThreadOwner(std::this_thread::get_id());
-
- _newSockets.clear();
-
- // Extract list of callbacks to process
- std::swap(_newCallbacks, invoke);
- }
-
- for (const auto& callback : invoke)
- {
- try
- {
- callback();
- }
- catch (const std::exception& exc)
- {
- LOG_ERR("Exception while invoking poll [" << _name <<
- "] callback: " << exc.what());
- }
- }
-
- try
- {
- wakeupHook();
- }
- catch (const std::exception& exc)
- {
- LOG_ERR("Exception while invoking poll [" << _name <<
- "] wakeup hook: " << exc.what());
- }
- }
-
- // This should only happen when we're stopping.
- if (_pollSockets.size() != size)
- return rc;
-
- // Fire the poll callbacks and remove dead fds.
- std::chrono::steady_clock::time_point newNow =
- std::chrono::steady_clock::now();
-
- for (int i = static_cast<int>(size) - 1; i >= 0; --i)
- {
- SocketDisposition disposition(_pollSockets[i]);
- try
- {
- _pollSockets[i]->handlePoll(disposition, newNow,
- _pollFds[i].revents);
- }
- catch (const std::exception& exc)
- {
- LOG_ERR("Error while handling poll for socket #" <<
- _pollFds[i].fd << " in " << _name << ": " << exc.what());
- disposition.setClosed();
- rc = -1;
- }
-
- if (disposition.isMove() || disposition.isClosed())
- {
- LOG_DBG("Removing socket #" << _pollFds[i].fd << " (of " <<
- _pollSockets.size() << ") from " << _name);
- _pollSockets.erase(_pollSockets.begin() + i);
- }
-
- disposition.execute();
- }
-
- return rc;
- }
+ int ppoll(int64_t timeoutMaxMicroS);
/// Write to a wakeup descriptor
static void wakeup (int fd)
@@ -811,8 +695,8 @@ private:
const std::string &pathAndQuery);
/// Initialize the poll fds array with the right events
- void setupPollFds(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs)
+ void psetupPollFds(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS)
{
const size_t size = _pollSockets.size();
@@ -820,7 +704,7 @@ private:
for (size_t i = 0; i < size; ++i)
{
- int events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs);
+ int events = _pollSockets[i]->pgetPollEvents(now, timeoutMaxMicroS);
assert(events >= 0); // Or > 0 even?
_pollFds[i].fd = _pollSockets[i]->getFD();
_pollFds[i].events = events;
@@ -920,12 +804,12 @@ public:
Socket::shutdown();
}
- int getPollEvents(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS) override
{
// cf. SslSocket::getPollEvents
assertCorrectThread();
- int events = _socketHandler->getPollEvents(now, timeoutMaxMs);
+ int events = _socketHandler->pgetPollEvents(now, timeoutMaxMicroS);
if (!_outBuffer.empty() || _shutdownSignalled)
events |= POLLOUT;
return events;
diff --git a/net/SslSocket.hpp b/net/SslSocket.hpp
index 27e075328..e6b7f908f 100644
--- a/net/SslSocket.hpp
+++ b/net/SslSocket.hpp
@@ -126,11 +126,11 @@ public:
return handleSslState(SSL_write(_ssl, buf, len));
}
- int getPollEvents(std::chrono::steady_clock::time_point now,
- int & timeoutMaxMs) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t & timeoutMaxMicroS) override
{
assertCorrectThread();
- int events = getSocketHandler()->getPollEvents(now, timeoutMaxMs);
+ int events = getSocketHandler()->pgetPollEvents(now, timeoutMaxMicroS);
if (_sslWantsTo == SslWantsTo::Read)
{
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index b23c3951f..24c3a839a 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -48,8 +48,8 @@ protected:
static const unsigned char Mask = 0x80;
};
- static const int InitialPingDelayMs;
- static const int PingFrequencyMs;
+ static const int InitialPingDelayMicroS;
+ static const int PingFrequencyMicroS;
public:
/// Perform upgrade ourselves, or select a client web socket.
@@ -81,8 +81,8 @@ public:
const Poco::Net::HTTPRequest& request)
: _socket(socket)
, _lastPingSentTime(std::chrono::steady_clock::now() -
- std::chrono::milliseconds(PingFrequencyMs) -
- std::chrono::milliseconds(InitialPingDelayMs))
+ std::chrono::microseconds(PingFrequencyMicroS) -
+ std::chrono::microseconds(InitialPingDelayMicroS))
, _pingTimeUs(0)
, _shuttingDown(false)
, _isClient(false)
@@ -430,14 +430,14 @@ public:
}
}
- int getPollEvents(std::chrono::steady_clock::time_point now,
- int & timeoutMaxMs) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t & timeoutMaxMicroS) override
{
if (!_isClient)
{
- const int timeSincePingMs =
- std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count();
- timeoutMaxMs = std::min(timeoutMaxMs, PingFrequencyMs - timeSincePingMs);
+ const int64_t timeSincePingMicroS =
+ std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime).count();
+ timeoutMaxMicroS = std::min(timeoutMaxMicroS, PingFrequencyMicroS - timeSincePingMicroS);
}
int events = POLLIN;
if (_msgHandler && _msgHandler->hasQueuedMessages())
@@ -493,9 +493,9 @@ private:
if (_isClient)
return;
- const int timeSincePingMs =
- std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count();
- if (timeSincePingMs >= PingFrequencyMs)
+ const int64_t timeSincePingMicroS =
+ std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime).count();
+ if (timeSincePingMicroS >= PingFrequencyMicroS)
{
const std::shared_ptr<StreamSocket> socket = _socket.lock();
if (socket)
diff --git a/test/UnitWOPIDocumentConflict.cpp b/test/UnitWOPIDocumentConflict.cpp
index b1aed94bc..de6b2d97f 100644
--- a/test/UnitWOPIDocumentConflict.cpp
+++ b/test/UnitWOPIDocumentConflict.cpp
@@ -98,7 +98,7 @@ public:
// ModifiedStatus=true is a bit slow; let's sleep and hope that
// it is received before we wake up
- std::this_thread::sleep_for(std::chrono::milliseconds(POLL_TIMEOUT_MS));
+ std::this_thread::sleep_for(std::chrono::microseconds(POLL_TIMEOUT_MICRO_S));
// change the document underneath, in storage
setFileContent("Modified content in storage");
diff --git a/test/helpers.hpp b/test/helpers.hpp
index 1b831a289..748e5572d 100644
--- a/test/helpers.hpp
+++ b/test/helpers.hpp
@@ -275,7 +275,7 @@ int getErrorCode(LOOLWebSocket& ws, std::string& message, const std::string& tes
{
bytes = ws.receiveFrame(buffer.begin(), READ_BUFFER_SIZE, flags);
TST_LOG("Got " << LOOLProtocol::getAbbreviatedFrameDump(buffer.begin(), bytes, flags));
- std::this_thread::sleep_for(std::chrono::milliseconds(POLL_TIMEOUT_MS));
+ std::this_thread::sleep_for(std::chrono::microseconds(POLL_TIMEOUT_MICRO_S));
}
while (bytes > 0 && (flags & Poco::Net::WebSocket::FRAME_OP_BITMASK) != Poco::Net::WebSocket::FRAME_OP_CLOSE);
@@ -463,7 +463,7 @@ connectLOKit(const Poco::URI& uri,
TST_LOG("Connection problem: " << ex.what());
}
- std::this_thread::sleep_for(std::chrono::milliseconds(POLL_TIMEOUT_MS));
+ std::this_thread::sleep_for(std::chrono::microseconds(POLL_TIMEOUT_MICRO_S));
}
while (retries--);
diff --git a/tools/WebSocketDump.cpp b/tools/WebSocketDump.cpp
index c699a8fed..822a256e8 100644
--- a/tools/WebSocketDump.cpp
+++ b/tools/WebSocketDump.cpp
@@ -175,8 +175,8 @@ private:
in.erase(in.begin(), itBody);
}
- int getPollEvents(std::chrono::steady_clock::time_point /* now */,
- int & /* timeoutMaxMs */) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point /* now */,
+ int64_t & /* timeoutMaxMicroS */) override
{
return POLLIN;
}
@@ -290,7 +290,7 @@ int main (int argc, char **argv)
while (true)
{
- DumpSocketPoll.poll(100 * 1000);
+ DumpSocketPoll.ppoll(100 * 1000 * 1000);
}
}
diff --git a/wsd/Admin.cpp b/wsd/Admin.cpp
index 3fc0b6955..ea905827c 100644
--- a/wsd/Admin.cpp
+++ b/wsd/Admin.cpp
@@ -24,6 +24,7 @@
#include <Common.hpp>
#include "FileServer.hpp"
#include <IoUtil.hpp>
+#include "LOOLWSD.hpp"
#include <Log.hpp>
#include <Protocol.hpp>
#include "Storage.hpp"
@@ -157,24 +158,11 @@ void AdminSocketHandler::handleMessage(const std::vector<char> &payload)
{
const int pid = std::stoi(tokens[1]);
LOG_INF("Admin request to kill PID: " << pid);
-
- std::set<pid_t> pids = model.getDocumentPids();
- if (pids.find(pid) != pids.end())
- {
- SigUtil::killChild(pid);
- }
- else
- {
- LOG_WRN("Invalid PID to kill (not a document pid)");
- }
+ SigUtil::killChild(pid);
}
catch (std::invalid_argument& exc)
{
- LOG_WRN("Invalid PID to kill (invalid argument): " << tokens[1]);
- }
- catch (std::out_of_range& exc)
- {
- LOG_WRN("Invalid PID to kill (out of range): " << tokens[1]);
+ LOG_WRN("Invalid PID to kill: " << tokens[1]);
}
}
else if (tokens.equals(0, "settings"))
@@ -295,11 +283,7 @@ AdminSocketHandler::AdminSocketHandler(Admin* adminManager)
void AdminSocketHandler::sendTextFrame(const std::string& message)
{
- if (!Util::isFuzzing())
- {
- UnitWSD::get().onAdminQueryMessage(message);
- }
-
+ UnitWSD::get().onAdminQueryMessage(message);
if (_isAuthenticated)
{
LOG_TRC("send admin text frame '" << message << "'");
@@ -360,6 +344,7 @@ Admin::Admin() :
SocketPoll("admin"),
_model(AdminModel()),
_forKitPid(-1),
+ _forKitWritePipe(-1),
_lastTotalMemory(0),
_lastJiffies(0),
_lastSentCount(0),
@@ -470,7 +455,7 @@ void Admin::pollingThread()
// Handle websockets & other work.
const int timeout = capAndRoundInterval(std::min(std::min(cpuWait, memWait), netWait));
LOG_TRC("Admin poll for " << timeout << "ms.");
- poll(timeout);
+ ppoll(timeout * 1000); // continue with ms for admin, settings etc.
}
}
@@ -606,7 +591,10 @@ void Admin::notifyForkit()
<< "setconfig limit_file_size_mb " << _defDocProcSettings.getLimitFileSizeMb() << '\n'
<< "setconfig limit_num_open_files " << _defDocProcSettings.getLimitNumberOpenFiles() << '\n';
- LOOLWSD::sendMessageToForKit(oss.str());
+ if (_forKitWritePipe != -1)
+ IoUtil::writeToPipe(_forKitWritePipe, oss.str());
+ else
+ LOG_INF("Forkit write pipe not set (yet).");
}
void Admin::triggerMemoryCleanup(const size_t totalMem)
@@ -675,8 +663,8 @@ public:
_uri(uri)
{
}
- int getPollEvents(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS) override
{
if (_connecting)
{
@@ -684,7 +672,7 @@ public:
return POLLOUT;
}
else
- return AdminSocketHandler::getPollEvents(now, timeoutMaxMs);
+ return AdminSocketHandler::pgetPollEvents(now, timeoutMaxMicroS);
}
void performWrites() override
diff --git a/wsd/Admin.hpp b/wsd/Admin.hpp
index 6287d38bc..a418040c6 100644
--- a/wsd/Admin.hpp
+++ b/wsd/Admin.hpp
@@ -16,7 +16,6 @@
#include "Log.hpp"
#include "net/WebSocketHandler.hpp"
-#include "LOOLWSD.hpp"
class Admin;
@@ -39,13 +38,13 @@ public:
static void subscribeAsync(const std::shared_ptr<AdminSocketHandler>& handler);
- /// Process incoming websocket messages
- void handleMessage(const std::vector<char> &data) override;
-
private:
/// Sends text frames simply to authenticated clients.
void sendTextFrame(const std::string& message);
+ /// Process incoming websocket messages
+ void handleMessage(const std::vector<char> &data) override;
+
private:
Admin* _admin;
int _sessionId;
@@ -92,6 +91,7 @@ public:
void rmDoc(const std::string& docKey);
void setForKitPid(const int forKitPid) { _forKitPid = forKitPid; _model.setForKitPid(forKitPid);}
+ void setForKitWritePipe(const int forKitWritePipe) { _forKitWritePipe = forKitWritePipe; }
/// Callers must ensure that modelMutex is acquired
AdminModel& getModel();
@@ -157,6 +157,7 @@ private:
/// the Admin Poll thread.
AdminModel _model;
int _forKitPid;
+ int _forKitWritePipe;
size_t _lastTotalMemory;
size_t _lastJiffies;
uint64_t _lastSentCount;
diff --git a/wsd/DocumentBroker.cpp b/wsd/DocumentBroker.cpp
index 02f6d2a2b..1d63e813f 100644
--- a/wsd/DocumentBroker.cpp
+++ b/wsd/DocumentBroker.cpp
@@ -298,7 +298,7 @@ void DocumentBroker::pollThread()
// Main polling loop goodness.
while (!_stop && _poll->continuePolling() && !SigUtil::getTerminationFlag())
{
- _poll->poll(SocketPoll::DefaultPollTimeoutMs);
+ _poll->ppoll(SocketPoll::DefaultPollTimeoutMicroS);
const auto now = std::chrono::steady_clock::now();
@@ -440,9 +440,9 @@ void DocumentBroker::pollThread()
}
// Flush socket data first.
- constexpr int flushTimeoutMs = POLL_TIMEOUT_MS * 2; // ~1000ms
+ constexpr int64_t flushTimeoutMicroS = POLL_TIMEOUT_MICRO_S * 2; // ~1000ms
LOG_INF("Flushing socket for doc ["
- << _docKey << "] for " << flushTimeoutMs << " ms. stop: " << _stop
+ << _docKey << "] for " << flushTimeoutMicroS << " us. stop: " << _stop
<< ", continuePolling: " << _poll->continuePolling()
<< ", ShutdownRequestFlag: " << SigUtil::getShutdownRequestFlag()
<< ", TerminationFlag: " << SigUtil::getTerminationFlag()
@@ -451,11 +451,11 @@ void DocumentBroker::pollThread()
while (_poll->getSocketCount())
{
const auto now = std::chrono::steady_clock::now();
- const int elapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(now - flushStartTime).count();
- if (elapsedMs > flushTimeoutMs)
+ const int64_t elapsedMicroS = std::chrono::duration_cast<std::chrono::microseconds>(now - flushStartTime).count();
+ if (elapsedMicroS > flushTimeoutMicroS)
break;
- _poll->poll(std::min(flushTimeoutMs - elapsedMs, POLL_TIMEOUT_MS / 5));
+ _poll->ppoll(std::min(flushTimeoutMicroS - elapsedMicroS, (int64_t)POLL_TIMEOUT_MICRO_S / 5));
}
LOG_INF("Finished flushing socket for doc [" << _docKey << "]. stop: " << _stop << ", continuePolling: " <<
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index c732169c7..1d014927a 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -2061,8 +2061,8 @@ private:
" has no DocumentBroker to handle message: [" << abbr << "].");
}
- int getPollEvents(std::chrono::steady_clock::time_point /* now */,
- int & /* timeoutMaxMs */) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point /* now */,
+ int64_t & /* timeoutMaxMs */) override
{
return POLLIN;
}
@@ -2393,8 +2393,8 @@ private:
#endif
}
- int getPollEvents(std::chrono::steady_clock::time_point /* now */,
- int & /* timeoutMaxMs */) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point /* now */,
+ int64_t & /* timeoutMaxMs */) override
{
return POLLIN;
}
@@ -3577,10 +3577,10 @@ int LOOLWSD::innerMain()
UnitWSD::get().invokeTest();
// This timeout affects the recovery time of prespawned children.
- const int msWait = UnitWSD::isUnitTesting() ?
- UnitWSD::get().getTimeoutMilliSeconds() / 4 :
- SocketPoll::DefaultPollTimeoutMs * 4;
- mainWait.poll(msWait);
+ const int waitMicroS = UnitWSD::isUnitTesting() ?
+ UnitWSD::get().getTimeoutMilliSeconds() * 1000 / 4 :
+ SocketPoll::DefaultPollTimeoutMicroS * 4;
+ mainWait.ppoll(waitMicroS);
// Wake the prisoner poll to spawn some children, if necessary.
PrisonerPoll.wakeup();
More information about the Libreoffice-commits
mailing list