[Libreoffice-commits] online.git: 5 commits - Makefile.am net/DelaySocket.cpp net/DelaySocket.hpp net/Socket.hpp wsd/LOOLWSD.cpp
Michael Meeks
michael.meeks at collabora.com
Sat Apr 22 20:23:02 UTC 2017
Makefile.am | 2
net/DelaySocket.cpp | 261 ++++++++++++++++++++++++++++++++++++++++++++++++++++
net/DelaySocket.hpp | 27 +++++
net/Socket.hpp | 17 ++-
wsd/LOOLWSD.cpp | 29 +++++
5 files changed, 329 insertions(+), 7 deletions(-)
New commits:
commit 8ef8e6737f74fafa1f098cba5163cf705fbae107
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Sat Apr 22 18:42:06 2017 +0100
DelaySocket - disable latency for now.
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index f1bb28ea..15391c4c 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -169,7 +169,7 @@ int MasterPortNumber = DEFAULT_MASTER_PORT_NUMBER;
static bool DisplayVersion = false;
/// Funky latency simulation basic delay (ms)
-static int SimulatedLatencyMs = 150;
+static int SimulatedLatencyMs = 0; // 150;
// Tracks the set of prisoners / children waiting to be used.
static std::mutex NewChildrenMutex;
commit 5d0e4fa0201be45ac9b8eb6c479c02ca2e020fab
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Sat Apr 22 18:37:38 2017 +0100
DelaySocket: hide logging.
diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index d2b0f7db..29a7fc1d 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -11,6 +11,8 @@
#include "net/DelaySocket.hpp"
+#define DELAY_LOG(X) std::cerr << X << "\n";
+
class Delayer;
// FIXME: TerminatingPoll ?
@@ -79,8 +81,8 @@ public:
int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
(*_chunks.begin())->_sendTime - now).count();
if (remainingMs < timeoutMaxMs)
- std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs
- << "ms from " << timeoutMaxMs << "ms\n";
+ DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMs
+ << "ms from " << timeoutMaxMs << "ms\n");
timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
}
@@ -116,8 +118,7 @@ public:
shutdown();
break;
}
- std::cerr << "#" << getFD() << " changed to state "
- << newState << "\n";
+ DELAY_LOG("#" << getFD() << " changed to state " << newState << "\n");
_state = newState;
}
@@ -138,8 +139,8 @@ public:
changeState(EofFlushWrites);
else if (len >= 0)
{
- std::cerr << "#" << getFD() << " read " << len
- << " to queue: " << _chunks.size() << "\n";
+ DELAY_LOG("#" << getFD() << " read " << len
+ << " to queue: " << _chunks.size() << "\n");
chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]);
if (_dest)
_dest->_chunks.push_back(chunk);
@@ -148,7 +149,7 @@ public:
}
else if (errno != EAGAIN && errno != EWOULDBLOCK)
{
- std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
+ DELAY_LOG("#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n");
changeState(Closed); // FIXME - propagate the error ?
}
}
@@ -166,7 +167,7 @@ public:
{
if (chunk->_data.size() == 0)
{ // delayed error or close
- std::cerr << "#" << getFD() << " handling delayed close\n";
+ DELAY_LOG("#" << getFD() << " handling delayed close\n");
changeState(Closed);
}
else
@@ -180,21 +181,23 @@ public:
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
- std::cerr << "#" << getFD() << " full - waiting for write\n";
+ DELAY_LOG("#" << getFD() << " full - waiting for write\n");
}
else
{
- std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+ DELAY_LOG("#" << getFD() << " failed onwards write "
+ << len << "bytes of "
<< chunk->_data.size()
- << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
+ << " queue: " << _chunks.size() << " error "
+ << strerror(errno) << "\n");
changeState(Closed);
}
}
else
{
- std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
+ DELAY_LOG("#" << getFD() << " written onwards " << len << "bytes of "
<< chunk->_data.size()
- << " queue: " << _chunks.size() << "\n";
+ << " queue: " << _chunks.size() << "\n");
if (len > 0)
chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
@@ -207,7 +210,7 @@ public:
if (events & (POLLERR | POLLHUP | POLLNVAL))
{
- std::cerr << "#" << getFD() << " error events: " << events << "\n";
+ DELAY_LOG("#" << getFD() << " error events: " << events << "\n");
changeState(Closed);
}
commit 63aba6be1ad3ad9120f7ff78fd8de4887ac1365b
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Sat Apr 22 18:30:37 2017 +0100
DelaySocket: working, and much cleaner / simpler.
diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index a27c618f..d2b0f7db 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -19,10 +19,11 @@ static SocketPoll DelayPoll("delay_poll");
/// Reads from fd, delays that and then writes to _dest.
class DelaySocket : public Socket {
int _delayMs;
- bool _closed;
- bool _stopPoll;
- bool _waitForWrite;
- std::shared_ptr<DelaySocket> _dest;
+ enum State { ReadWrite, // normal socket
+ EofFlushWrites, // finish up writes and close
+ Closed };
+ State _state;
+ std::shared_ptr<DelaySocket> _dest; // our writing twin.
const size_t WindowSize = 64 * 1024;
@@ -43,8 +44,8 @@ class DelaySocket : public Socket {
std::vector<std::shared_ptr<WriteChunk>> _chunks;
public:
DelaySocket(int delayMs, int fd) :
- Socket (fd), _delayMs(delayMs), _closed(false),
- _stopPoll(false), _waitForWrite(false)
+ Socket (fd), _delayMs(delayMs),
+ _state(ReadWrite)
{
// setSocketBufferSize(Socket::DefaultSendBufferSize);
}
@@ -90,16 +91,39 @@ public:
return POLLIN;
}
- void pushCloseChunk(bool bErrorSocket)
+ void pushCloseChunk()
{
- // socket in error state ? don't keep polling it.
- _stopPoll |= bErrorSocket;
_chunks.push_back(std::make_shared<WriteChunk>(_delayMs));
}
+ void changeState(State newState)
+ {
+ switch (newState)
+ {
+ case ReadWrite:
+ assert (false);
+ break;
+ case EofFlushWrites:
+ assert (_state == ReadWrite);
+ assert (_dest);
+ _dest->pushCloseChunk();
+ _dest = nullptr;
+ break;
+ case Closed:
+ if (_dest && _state == ReadWrite)
+ _dest->pushCloseChunk();
+ _dest = nullptr;
+ shutdown();
+ break;
+ }
+ std::cerr << "#" << getFD() << " changed to state "
+ << newState << "\n";
+ _state = newState;
+ }
+
HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override
{
- if (events & POLLIN)
+ if (_state == ReadWrite && (events & POLLIN))
{
auto chunk = std::make_shared<WriteChunk>(_delayMs);
@@ -110,19 +134,9 @@ public:
len = ::read(getFD(), buf, toRead);
} while (len < 0 && errno == EINTR);
- if (len == 0)
- { // EOF.
- if (_dest) // FIXME: cut and paste ...
- {
- _dest->pushCloseChunk(false);
- _dest = nullptr;
- }
- std::cerr << "EOF on input\n";
- shutdown();
- return HandleResult::SOCKET_CLOSED;
- }
-
- if (len >= 0)
+ if (len == 0) // EOF.
+ changeState(EofFlushWrites);
+ else if (len >= 0)
{
std::cerr << "#" << getFD() << " read " << len
<< " to queue: " << _chunks.size() << "\n";
@@ -130,17 +144,21 @@ public:
if (_dest)
_dest->_chunks.push_back(chunk);
else
- std::cerr << "no destination for data\n";
+ assert("no destination for data" && false);
}
else if (errno != EAGAIN && errno != EWOULDBLOCK)
{
std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
- pushCloseChunk(true);
+ changeState(Closed); // FIXME - propagate the error ?
}
}
- // Write if we have delayed enough.
- if (_chunks.size() > 0)
+ if (_chunks.size() == 0)
+ {
+ if (_state == EofFlushWrites)
+ changeState(Closed);
+ }
+ else // Write if we have delayed enough.
{
std::shared_ptr<WriteChunk> chunk = *_chunks.begin();
if (std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -149,59 +167,54 @@ public:
if (chunk->_data.size() == 0)
{ // delayed error or close
std::cerr << "#" << getFD() << " handling delayed close\n";
- _closed = true;
- _dest = nullptr;
- shutdown();
- return HandleResult::SOCKET_CLOSED;
+ changeState(Closed);
}
-
- ssize_t len;
- do {
- len = ::write(getFD(), &chunk->_data[0], chunk->_data.size());
- } while (len < 0 && errno == EINTR);
-
- if (len < 0)
+ else
{
- if (errno == EAGAIN || errno == EWOULDBLOCK)
+ ssize_t len;
+ do {
+ len = ::write(getFD(), &chunk->_data[0], chunk->_data.size());
+ } while (len < 0 && errno == EINTR);
+
+ if (len < 0)
{
- std::cerr << "#" << getFD() << " full - waiting for write\n";
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ std::cerr << "#" << getFD() << " full - waiting for write\n";
+ }
+ else
+ {
+ std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+ << chunk->_data.size()
+ << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
+ changeState(Closed);
+ }
}
else
{
- std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+ std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
<< chunk->_data.size()
- << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
- // URGH - cut and paste ...
- _closed = true;
- _dest = nullptr;
- // FIXME: tell dest we're dead ... [!] ...
- shutdown();
- return HandleResult::SOCKET_CLOSED;
- }
- }
- else
- {
- std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
- << chunk->_data.size()
- << " queue: " << _chunks.size() << "\n";
- if (len > 0)
- {
- chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
- }
+ << " queue: " << _chunks.size() << "\n";
+ if (len > 0)
+ chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
- if (chunk->_data.size() == 0)
- _chunks.erase(_chunks.begin(), _chunks.begin() + 1);
+ if (chunk->_data.size() == 0)
+ _chunks.erase(_chunks.begin(), _chunks.begin() + 1);
+ }
}
}
}
- // FIXME: ideally we could avoid polling & delay _closed state etc.
if (events & (POLLERR | POLLHUP | POLLNVAL))
{
std::cerr << "#" << getFD() << " error events: " << events << "\n";
- pushCloseChunk(true);
+ changeState(Closed);
}
- return HandleResult::CONTINUE;
+
+ if (_state == Closed)
+ return HandleResult::SOCKET_CLOSED;
+ else
+ return HandleResult::CONTINUE;
}
};
commit 26a6d514d58f8aa3255c6a424b8e27af66416bdb
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Fri Apr 21 22:37:55 2017 +0100
DelaySocket - simplified but working.
Assume for now that we have infinite space in-transit.
diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index 52539e7d..a27c618f 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -22,34 +22,33 @@ class DelaySocket : public Socket {
bool _closed;
bool _stopPoll;
bool _waitForWrite;
- std::weak_ptr<DelaySocket> _dest;
+ std::shared_ptr<DelaySocket> _dest;
const size_t WindowSize = 64 * 1024;
- struct DelayChunk {
+ /// queued up data - sent to us by our opposite twin.
+ struct WriteChunk {
std::chrono::steady_clock::time_point _sendTime;
std::vector<char> _data;
- DelayChunk(int delayMs)
+ WriteChunk(int delayMs)
{
_sendTime = std::chrono::steady_clock::now() +
std::chrono::milliseconds(delayMs);
}
bool isError() { return _data.size() == 0; }
private:
- DelayChunk();
+ WriteChunk();
};
- size_t _chunksSize;
- std::vector<std::shared_ptr<DelayChunk>> _chunks;
+ std::vector<std::shared_ptr<WriteChunk>> _chunks;
public:
DelaySocket(int delayMs, int fd) :
Socket (fd), _delayMs(delayMs), _closed(false),
- _stopPoll(false), _waitForWrite(false),
- _chunksSize(0)
+ _stopPoll(false), _waitForWrite(false)
{
// setSocketBufferSize(Socket::DefaultSendBufferSize);
}
- void setDestination(const std::weak_ptr<DelaySocket> &dest)
+ void setDestination(const std::shared_ptr<DelaySocket> &dest)
{
_dest = dest;
}
@@ -74,71 +73,64 @@ public:
int getPollEvents(std::chrono::steady_clock::time_point now,
int &timeoutMaxMs) override
{
- auto dest = _dest.lock();
-
- bool bOtherIsWriteBlocked = !dest || dest->_waitForWrite;
- bool bWeAreReadBlocked = _chunksSize >= WindowSize;
-
- if (_chunks.size() > 0 && (!bOtherIsWriteBlocked || !bWeAreReadBlocked))
+ if (_chunks.size() > 0)
{
int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
(*_chunks.begin())->_sendTime - now).count();
if (remainingMs < timeoutMaxMs)
std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs
- << "ms from " << timeoutMaxMs << "ms "
- << "owb: " << bOtherIsWriteBlocked << " rb: "
- << bWeAreReadBlocked << "\n";
+ << "ms from " << timeoutMaxMs << "ms\n";
timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
}
- if (_stopPoll)
- return -1;
-
- int events = 0;
-
- if (!bWeAreReadBlocked)
- events |= POLLIN;
-
- // NB. controlled by the other socket.
- if (_waitForWrite)
- events |= POLLOUT;
-
- return events;
+ if (_chunks.size() > 0 &&
+ now > (*_chunks.begin())->_sendTime)
+ return POLLIN | POLLOUT;
+ else
+ return POLLIN;
}
void pushCloseChunk(bool bErrorSocket)
{
// socket in error state ? don't keep polling it.
_stopPoll |= bErrorSocket;
- _chunks.push_back(std::make_shared<DelayChunk>(_delayMs));
+ _chunks.push_back(std::make_shared<WriteChunk>(_delayMs));
}
HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override
{
- auto dest = _dest.lock();
-
if (events & POLLIN)
{
- auto chunk = std::make_shared<DelayChunk>(_delayMs);
+ auto chunk = std::make_shared<WriteChunk>(_delayMs);
char buf[64 * 1024];
ssize_t len;
- size_t toRead = std::min(sizeof(buf), WindowSize - _chunksSize);
- if (_closed)
- { // get last data before async close
- toRead = sizeof (buf);
- }
+ size_t toRead = sizeof(buf); //std::min(sizeof(buf), WindowSize - _chunksSize);
do {
len = ::read(getFD(), buf, toRead);
} while (len < 0 && errno == EINTR);
+ if (len == 0)
+ { // EOF.
+ if (_dest) // FIXME: cut and paste ...
+ {
+ _dest->pushCloseChunk(false);
+ _dest = nullptr;
+ }
+ std::cerr << "EOF on input\n";
+ shutdown();
+ return HandleResult::SOCKET_CLOSED;
+ }
+
if (len >= 0)
{
std::cerr << "#" << getFD() << " read " << len
<< " to queue: " << _chunks.size() << "\n";
chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]);
- _chunksSize += len;
- _chunks.push_back(chunk);
+ if (_dest)
+ _dest->_chunks.push_back(chunk);
+ else
+ std::cerr << "no destination for data\n";
}
else if (errno != EAGAIN && errno != EWOULDBLOCK)
{
@@ -147,65 +139,54 @@ public:
}
}
- if (_closed)
- {
- std::cerr << "#" << getFD() << " closing\n";
- dumpState(std::cerr);
- if (dest)
- {
- std::cerr << "\t#" << dest->getFD() << " closing linked\n";
- dest->dumpState(std::cerr);
- dest->pushCloseChunk(false);
- _dest.reset();
- }
- return HandleResult::SOCKET_CLOSED;
- }
-
// Write if we have delayed enough.
- if (dest && _chunks.size() > 0)
+ if (_chunks.size() > 0)
{
- std::shared_ptr<DelayChunk> chunk = *_chunks.begin();
+ std::shared_ptr<WriteChunk> chunk = *_chunks.begin();
if (std::chrono::duration_cast<std::chrono::milliseconds>(
now - chunk->_sendTime).count() >= 0)
{
- dest->_waitForWrite = false;
-
if (chunk->_data.size() == 0)
{ // delayed error or close
- std::cerr << "#" << getFD() << " handling delayed close with " << _chunksSize << "bytes left\n";
+ std::cerr << "#" << getFD() << " handling delayed close\n";
_closed = true;
- return HandleResult::CONTINUE;
+ _dest = nullptr;
+ shutdown();
+ return HandleResult::SOCKET_CLOSED;
}
ssize_t len;
do {
- len = ::write(dest->getFD(), &chunk->_data[0], chunk->_data.size());
+ len = ::write(getFD(), &chunk->_data[0], chunk->_data.size());
} while (len < 0 && errno == EINTR);
if (len < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
- dest->_waitForWrite = true;
- std::cerr << "#" << dest->getFD() << " full - waiting for write ultimately from fd #" << getFD() << "\n";
+ std::cerr << "#" << getFD() << " full - waiting for write\n";
}
else
{
- std::cerr << "#" << dest->getFD() << " failed onwards write " << len << "bytes of "
- << chunk->_data.size() << " ultimately from fd #" << getFD()
+ std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+ << chunk->_data.size()
<< " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
- dest->pushCloseChunk(false);
+ // URGH - cut and paste ...
+ _closed = true;
+ _dest = nullptr;
+ // FIXME: tell dest we're dead ... [!] ...
+ shutdown();
+ return HandleResult::SOCKET_CLOSED;
}
}
else
{
- std::cerr << "#" << dest->getFD() << " written onwards " << len << "bytes of "
- << chunk->_data.size() << " ultimately from fd #" << getFD()
+ std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
+ << chunk->_data.size()
<< " queue: " << _chunks.size() << "\n";
if (len > 0)
{
chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
- _chunksSize -= len;
}
if (chunk->_data.size() == 0)
commit 1c7f94045a433625f9ec55b137bbd0322f27d532
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Thu Mar 23 17:14:51 2017 +0000
Initial DelaySocket goodness.
diff --git a/Makefile.am b/Makefile.am
index a348f3f4..c72ad4d0 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -53,6 +53,7 @@ shared_sources = common/FileUtil.cpp \
common/Unit.cpp \
common/UnitHTTP.cpp \
common/Util.cpp \
+ net/DelaySocket.cpp \
net/Socket.cpp
if ENABLE_SSL
shared_sources += net/Ssl.cpp
@@ -162,6 +163,7 @@ shared_headers = common/Common.hpp \
common/SigUtil.hpp \
common/security.h \
common/SpookyV2.h \
+ net/DelaySocket.hpp \
net/ServerSocket.hpp \
net/Socket.hpp \
net/WebSocketHandler.hpp \
diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
new file mode 100644
index 00000000..52539e7d
--- /dev/null
+++ b/net/DelaySocket.cpp
@@ -0,0 +1,264 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include "config.h"
+
+#include "net/DelaySocket.hpp"
+
+class Delayer;
+
+// FIXME: TerminatingPoll ?
+static SocketPoll DelayPoll("delay_poll");
+
+/// Reads from fd, delays that and then writes to _dest.
+class DelaySocket : public Socket {
+ int _delayMs;
+ bool _closed;
+ bool _stopPoll;
+ bool _waitForWrite;
+ std::weak_ptr<DelaySocket> _dest;
+
+ const size_t WindowSize = 64 * 1024;
+
+ struct DelayChunk {
+ std::chrono::steady_clock::time_point _sendTime;
+ std::vector<char> _data;
+ DelayChunk(int delayMs)
+ {
+ _sendTime = std::chrono::steady_clock::now() +
+ std::chrono::milliseconds(delayMs);
+ }
+ bool isError() { return _data.size() == 0; }
+ private:
+ DelayChunk();
+ };
+
+ size_t _chunksSize;
+ std::vector<std::shared_ptr<DelayChunk>> _chunks;
+public:
+ DelaySocket(int delayMs, int fd) :
+ Socket (fd), _delayMs(delayMs), _closed(false),
+ _stopPoll(false), _waitForWrite(false),
+ _chunksSize(0)
+ {
+// setSocketBufferSize(Socket::DefaultSendBufferSize);
+ }
+ void setDestination(const std::weak_ptr<DelaySocket> &dest)
+ {
+ _dest = dest;
+ }
+
+ void dumpState(std::ostream& os) override
+ {
+ os << "\tfd: " << getFD()
+ << "\n\tqueue: " << _chunks.size() << "\n";
+ auto now = std::chrono::steady_clock::now();
+ for (auto &chunk : _chunks)
+ {
+ os << "\t\tin: " <<
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ chunk->_sendTime - now).count() << "ms - "
+ << chunk->_data.size() << "bytes\n";
+ }
+ }
+
+ // FIXME - really need to propagate 'noDelay' etc.
+ // have a debug only lookup of delayed sockets for this case ?
+
+ int getPollEvents(std::chrono::steady_clock::time_point now,
+ int &timeoutMaxMs) override
+ {
+ auto dest = _dest.lock();
+
+ bool bOtherIsWriteBlocked = !dest || dest->_waitForWrite;
+ bool bWeAreReadBlocked = _chunksSize >= WindowSize;
+
+ if (_chunks.size() > 0 && (!bOtherIsWriteBlocked || !bWeAreReadBlocked))
+ {
+ int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
+ (*_chunks.begin())->_sendTime - now).count();
+ if (remainingMs < timeoutMaxMs)
+ std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs
+ << "ms from " << timeoutMaxMs << "ms "
+ << "owb: " << bOtherIsWriteBlocked << " rb: "
+ << bWeAreReadBlocked << "\n";
+ timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
+ }
+
+ if (_stopPoll)
+ return -1;
+
+ int events = 0;
+
+ if (!bWeAreReadBlocked)
+ events |= POLLIN;
+
+ // NB. controlled by the other socket.
+ if (_waitForWrite)
+ events |= POLLOUT;
+
+ return events;
+ }
+
+ void pushCloseChunk(bool bErrorSocket)
+ {
+ // socket in error state ? don't keep polling it.
+ _stopPoll |= bErrorSocket;
+ _chunks.push_back(std::make_shared<DelayChunk>(_delayMs));
+ }
+
+ HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override
+ {
+ auto dest = _dest.lock();
+
+ if (events & POLLIN)
+ {
+ auto chunk = std::make_shared<DelayChunk>(_delayMs);
+
+ char buf[64 * 1024];
+ ssize_t len;
+ size_t toRead = std::min(sizeof(buf), WindowSize - _chunksSize);
+ if (_closed)
+ { // get last data before async close
+ toRead = sizeof (buf);
+ }
+ do {
+ len = ::read(getFD(), buf, toRead);
+ } while (len < 0 && errno == EINTR);
+
+ if (len >= 0)
+ {
+ std::cerr << "#" << getFD() << " read " << len
+ << " to queue: " << _chunks.size() << "\n";
+ chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]);
+ _chunksSize += len;
+ _chunks.push_back(chunk);
+ }
+ else if (errno != EAGAIN && errno != EWOULDBLOCK)
+ {
+ std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
+ pushCloseChunk(true);
+ }
+ }
+
+ if (_closed)
+ {
+ std::cerr << "#" << getFD() << " closing\n";
+ dumpState(std::cerr);
+ if (dest)
+ {
+ std::cerr << "\t#" << dest->getFD() << " closing linked\n";
+ dest->dumpState(std::cerr);
+ dest->pushCloseChunk(false);
+ _dest.reset();
+ }
+ return HandleResult::SOCKET_CLOSED;
+ }
+
+ // Write if we have delayed enough.
+ if (dest && _chunks.size() > 0)
+ {
+ std::shared_ptr<DelayChunk> chunk = *_chunks.begin();
+ if (std::chrono::duration_cast<std::chrono::milliseconds>(
+ now - chunk->_sendTime).count() >= 0)
+ {
+ dest->_waitForWrite = false;
+
+ if (chunk->_data.size() == 0)
+ { // delayed error or close
+ std::cerr << "#" << getFD() << " handling delayed close with " << _chunksSize << "bytes left\n";
+ _closed = true;
+ return HandleResult::CONTINUE;
+ }
+
+ ssize_t len;
+ do {
+ len = ::write(dest->getFD(), &chunk->_data[0], chunk->_data.size());
+ } while (len < 0 && errno == EINTR);
+
+ if (len < 0)
+ {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ dest->_waitForWrite = true;
+ std::cerr << "#" << dest->getFD() << " full - waiting for write ultimately from fd #" << getFD() << "\n";
+ }
+ else
+ {
+ std::cerr << "#" << dest->getFD() << " failed onwards write " << len << "bytes of "
+ << chunk->_data.size() << " ultimately from fd #" << getFD()
+ << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
+ dest->pushCloseChunk(false);
+ }
+ }
+ else
+ {
+ std::cerr << "#" << dest->getFD() << " written onwards " << len << "bytes of "
+ << chunk->_data.size() << " ultimately from fd #" << getFD()
+ << " queue: " << _chunks.size() << "\n";
+ if (len > 0)
+ {
+ chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
+ _chunksSize -= len;
+ }
+
+ if (chunk->_data.size() == 0)
+ _chunks.erase(_chunks.begin(), _chunks.begin() + 1);
+ }
+ }
+ }
+
+ // FIXME: ideally we could avoid polling & delay _closed state etc.
+ if (events & (POLLERR | POLLHUP | POLLNVAL))
+ {
+ std::cerr << "#" << getFD() << " error events: " << events << "\n";
+ pushCloseChunk(true);
+ }
+ return HandleResult::CONTINUE;
+ }
+};
+
+/// Delayer:
+///
+/// Some terminology:
+/// physical socket (DelaySocket's own fd) - what we accepted.
+/// internalFd - the internal side of the socket-pair
+/// delayFd - what we hand on to our un-suspecting wrapped socket
+/// which looks like an external socket - but delayed.
+namespace Delay {
+ int create(int delayMs, int physicalFd)
+ {
+ int pair[2];
+ int rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0, pair);
+ assert (rc == 0);
+ int internalFd = pair[0];
+ int delayFd = pair[1];
+
+ auto physical = std::make_shared<DelaySocket>(delayMs, physicalFd);
+ auto internal = std::make_shared<DelaySocket>(delayMs, internalFd);
+ physical->setDestination(internal);
+ internal->setDestination(physical);
+
+ DelayPoll.startThread();
+ DelayPoll.insertNewSocket(physical);
+ DelayPoll.insertNewSocket(internal);
+
+ return delayFd;
+ }
+ void dumpState(std::ostream &os)
+ {
+ if (DelayPoll.isAlive())
+ {
+ os << "Delay poll:\n";
+ DelayPoll.dumpState(os);
+ }
+ }
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/net/DelaySocket.hpp b/net/DelaySocket.hpp
new file mode 100644
index 00000000..df423b52
--- /dev/null
+++ b/net/DelaySocket.hpp
@@ -0,0 +1,27 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef INCLUDED_DELAY_SOCKET_HPP
+#define INCLUDED_DELAY_SOCKET_HPP
+
+#include <Socket.hpp>
+
+/// Simulates network latency for local debugging.
+///
+/// We are lifecycle managed internally based on the physical /
+/// delayFd lifecycle.
+namespace Delay
+{
+ int create(int delayMs, int physicalFd);
+ void dumpState(std::ostream &os);
+};
+
+#endif
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/net/Socket.hpp b/net/Socket.hpp
index 8c07e38d..0e616ddf 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -84,7 +84,7 @@ public:
virtual HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) = 0;
/// manage latency issues around packet aggregation
- void setNoDelay()
+ virtual void setNoDelay()
{
const int val = 1;
setsockopt (_fd, IPPROTO_TCP, TCP_NODELAY,
@@ -182,7 +182,7 @@ public:
virtual void dumpState(std::ostream&) {}
/// Set the thread-id we're bound to
- void setThreadOwner(const std::thread::id &id)
+ virtual void setThreadOwner(const std::thread::id &id)
{
if (id != _owner)
{
@@ -518,8 +518,17 @@ private:
for (size_t i = 0; i < size; ++i)
{
- _pollFds[i].fd = _pollSockets[i]->getFD();
- _pollFds[i].events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs);
+ int events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs);
+ if (events < 0) // timeout on dead socket
+ {
+ _pollFds[i].fd = _wakeup[0];
+ _pollFds[i].events = 0;
+ }
+ else
+ {
+ _pollFds[i].fd = _pollSockets[i]->getFD();
+ _pollFds[i].events = events;
+ }
_pollFds[i].revents = 0;
}
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index a0529380..f1bb28ea 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -100,6 +100,7 @@
#if ENABLE_SSL
# include "SslSocket.hpp"
#endif
+#include "DelaySocket.hpp"
#include "Storage.hpp"
#include "TraceFile.hpp"
#include "Unit.hpp"
@@ -167,6 +168,9 @@ int MasterPortNumber = DEFAULT_MASTER_PORT_NUMBER;
//TODO: Move to a more sensible namespace.
static bool DisplayVersion = false;
+/// Funky latency simulation basic delay (ms)
+static int SimulatedLatencyMs = 150;
+
// Tracks the set of prisoners / children waiting to be used.
static std::mutex NewChildrenMutex;
static std::condition_variable NewChildrenCV;
@@ -2113,17 +2117,32 @@ private:
class PlainSocketFactory : public SocketFactory
{
- std::shared_ptr<Socket> create(const int fd) override
+ std::shared_ptr<Socket> create(const int physicalFd) override
{
- return StreamSocket::create<StreamSocket>(fd, std::unique_ptr<SocketHandlerInterface>{ new ClientRequestDispatcher });
+ int fd = physicalFd;
+
+ if (SimulatedLatencyMs > 0)
+ fd = Delay::create(SimulatedLatencyMs, physicalFd);
+
+ std::shared_ptr<Socket> socket =
+ StreamSocket::create<StreamSocket>(
+ fd, std::unique_ptr<SocketHandlerInterface>{
+ new ClientRequestDispatcher });
+
+ return socket;
}
};
#if ENABLE_SSL
class SslSocketFactory : public SocketFactory
{
- std::shared_ptr<Socket> create(const int fd) override
+ std::shared_ptr<Socket> create(const int physicalFd) override
{
+ int fd = physicalFd;
+
+ if (SimulatedLatencyMs > 0)
+ fd = Delay::create(SimulatedLatencyMs, physicalFd);
+
return StreamSocket::create<SslStreamSocket>(fd, std::unique_ptr<SocketHandlerInterface>{ new ClientRequestDispatcher });
}
};
@@ -2133,6 +2152,7 @@ class PrisonerSocketFactory : public SocketFactory
{
std::shared_ptr<Socket> create(const int fd) override
{
+ // No local delay.
return StreamSocket::create<StreamSocket>(fd, std::unique_ptr<SocketHandlerInterface>{ new PrisonerRequestDispatcher });
}
};
@@ -2207,6 +2227,9 @@ public:
os << "Admin poll:\n";
Admin::instance().dumpState(os);
+ // If we have any delaying work going on.
+ Delay::dumpState(os);
+
os << "Document Broker polls "
<< "[ " << DocBrokers.size() << " ]:\n";
for (auto &i : DocBrokers)
More information about the Libreoffice-commits
mailing list