[Libreoffice-commits] online.git: Branch 'private/mmeeks/delaysocket' - 2 commits - net/DelaySocket.cpp
Michael Meeks
michael.meeks at collabora.com
Sat Apr 22 17:41:58 UTC 2017
net/DelaySocket.cpp | 158 ++++++++++++++++++++++++++++------------------------
1 file changed, 87 insertions(+), 71 deletions(-)
New commits:
commit 7b24ecd447bf6f9d8066a9c1547aab6e6f193502
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Sat Apr 22 18:37:38 2017 +0100
DelaySocket: hide logging.
diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index d2b0f7db..29a7fc1d 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -11,6 +11,8 @@
#include "net/DelaySocket.hpp"
+#define DELAY_LOG(X) std::cerr << X << "\n";
+
class Delayer;
// FIXME: TerminatingPoll ?
@@ -79,8 +81,8 @@ public:
int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
(*_chunks.begin())->_sendTime - now).count();
if (remainingMs < timeoutMaxMs)
- std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs
- << "ms from " << timeoutMaxMs << "ms\n";
+ DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMs
+ << "ms from " << timeoutMaxMs << "ms\n");
timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
}
@@ -116,8 +118,7 @@ public:
shutdown();
break;
}
- std::cerr << "#" << getFD() << " changed to state "
- << newState << "\n";
+ DELAY_LOG("#" << getFD() << " changed to state " << newState << "\n");
_state = newState;
}
@@ -138,8 +139,8 @@ public:
changeState(EofFlushWrites);
else if (len >= 0)
{
- std::cerr << "#" << getFD() << " read " << len
- << " to queue: " << _chunks.size() << "\n";
+ DELAY_LOG("#" << getFD() << " read " << len
+ << " to queue: " << _chunks.size() << "\n");
chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]);
if (_dest)
_dest->_chunks.push_back(chunk);
@@ -148,7 +149,7 @@ public:
}
else if (errno != EAGAIN && errno != EWOULDBLOCK)
{
- std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
+ DELAY_LOG("#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n");
changeState(Closed); // FIXME - propagate the error ?
}
}
@@ -166,7 +167,7 @@ public:
{
if (chunk->_data.size() == 0)
{ // delayed error or close
- std::cerr << "#" << getFD() << " handling delayed close\n";
+ DELAY_LOG("#" << getFD() << " handling delayed close\n");
changeState(Closed);
}
else
@@ -180,21 +181,23 @@ public:
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
- std::cerr << "#" << getFD() << " full - waiting for write\n";
+ DELAY_LOG("#" << getFD() << " full - waiting for write\n");
}
else
{
- std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+ DELAY_LOG("#" << getFD() << " failed onwards write "
+ << len << "bytes of "
<< chunk->_data.size()
- << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
+ << " queue: " << _chunks.size() << " error "
+ << strerror(errno) << "\n");
changeState(Closed);
}
}
else
{
- std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
+ DELAY_LOG("#" << getFD() << " written onwards " << len << "bytes of "
<< chunk->_data.size()
- << " queue: " << _chunks.size() << "\n";
+ << " queue: " << _chunks.size() << "\n");
if (len > 0)
chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
@@ -207,7 +210,7 @@ public:
if (events & (POLLERR | POLLHUP | POLLNVAL))
{
- std::cerr << "#" << getFD() << " error events: " << events << "\n";
+ DELAY_LOG("#" << getFD() << " error events: " << events << "\n");
changeState(Closed);
}
commit efced88935d88c534bef9d7f6ae6ad7539d7b51d
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Sat Apr 22 18:30:37 2017 +0100
DelaySocket: working, and much cleaner / simpler.
diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index a27c618f..d2b0f7db 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -19,10 +19,11 @@ static SocketPoll DelayPoll("delay_poll");
/// Reads from fd, delays that and then writes to _dest.
class DelaySocket : public Socket {
int _delayMs;
- bool _closed;
- bool _stopPoll;
- bool _waitForWrite;
- std::shared_ptr<DelaySocket> _dest;
+ enum State { ReadWrite, // normal socket
+ EofFlushWrites, // finish up writes and close
+ Closed };
+ State _state;
+ std::shared_ptr<DelaySocket> _dest; // our writing twin.
const size_t WindowSize = 64 * 1024;
@@ -43,8 +44,8 @@ class DelaySocket : public Socket {
std::vector<std::shared_ptr<WriteChunk>> _chunks;
public:
DelaySocket(int delayMs, int fd) :
- Socket (fd), _delayMs(delayMs), _closed(false),
- _stopPoll(false), _waitForWrite(false)
+ Socket (fd), _delayMs(delayMs),
+ _state(ReadWrite)
{
// setSocketBufferSize(Socket::DefaultSendBufferSize);
}
@@ -90,16 +91,39 @@ public:
return POLLIN;
}
- void pushCloseChunk(bool bErrorSocket)
+ void pushCloseChunk()
{
- // socket in error state ? don't keep polling it.
- _stopPoll |= bErrorSocket;
_chunks.push_back(std::make_shared<WriteChunk>(_delayMs));
}
+ void changeState(State newState)
+ {
+ switch (newState)
+ {
+ case ReadWrite:
+ assert (false);
+ break;
+ case EofFlushWrites:
+ assert (_state == ReadWrite);
+ assert (_dest);
+ _dest->pushCloseChunk();
+ _dest = nullptr;
+ break;
+ case Closed:
+ if (_dest && _state == ReadWrite)
+ _dest->pushCloseChunk();
+ _dest = nullptr;
+ shutdown();
+ break;
+ }
+ std::cerr << "#" << getFD() << " changed to state "
+ << newState << "\n";
+ _state = newState;
+ }
+
HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override
{
- if (events & POLLIN)
+ if (_state == ReadWrite && (events & POLLIN))
{
auto chunk = std::make_shared<WriteChunk>(_delayMs);
@@ -110,19 +134,9 @@ public:
len = ::read(getFD(), buf, toRead);
} while (len < 0 && errno == EINTR);
- if (len == 0)
- { // EOF.
- if (_dest) // FIXME: cut and paste ...
- {
- _dest->pushCloseChunk(false);
- _dest = nullptr;
- }
- std::cerr << "EOF on input\n";
- shutdown();
- return HandleResult::SOCKET_CLOSED;
- }
-
- if (len >= 0)
+ if (len == 0) // EOF.
+ changeState(EofFlushWrites);
+ else if (len >= 0)
{
std::cerr << "#" << getFD() << " read " << len
<< " to queue: " << _chunks.size() << "\n";
@@ -130,17 +144,21 @@ public:
if (_dest)
_dest->_chunks.push_back(chunk);
else
- std::cerr << "no destination for data\n";
+ assert("no destination for data" && false);
}
else if (errno != EAGAIN && errno != EWOULDBLOCK)
{
std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
- pushCloseChunk(true);
+ changeState(Closed); // FIXME - propagate the error ?
}
}
- // Write if we have delayed enough.
- if (_chunks.size() > 0)
+ if (_chunks.size() == 0)
+ {
+ if (_state == EofFlushWrites)
+ changeState(Closed);
+ }
+ else // Write if we have delayed enough.
{
std::shared_ptr<WriteChunk> chunk = *_chunks.begin();
if (std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -149,59 +167,54 @@ public:
if (chunk->_data.size() == 0)
{ // delayed error or close
std::cerr << "#" << getFD() << " handling delayed close\n";
- _closed = true;
- _dest = nullptr;
- shutdown();
- return HandleResult::SOCKET_CLOSED;
+ changeState(Closed);
}
-
- ssize_t len;
- do {
- len = ::write(getFD(), &chunk->_data[0], chunk->_data.size());
- } while (len < 0 && errno == EINTR);
-
- if (len < 0)
+ else
{
- if (errno == EAGAIN || errno == EWOULDBLOCK)
+ ssize_t len;
+ do {
+ len = ::write(getFD(), &chunk->_data[0], chunk->_data.size());
+ } while (len < 0 && errno == EINTR);
+
+ if (len < 0)
{
- std::cerr << "#" << getFD() << " full - waiting for write\n";
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ std::cerr << "#" << getFD() << " full - waiting for write\n";
+ }
+ else
+ {
+ std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+ << chunk->_data.size()
+ << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
+ changeState(Closed);
+ }
}
else
{
- std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+ std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
<< chunk->_data.size()
- << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
- // URGH - cut and paste ...
- _closed = true;
- _dest = nullptr;
- // FIXME: tell dest we're dead ... [!] ...
- shutdown();
- return HandleResult::SOCKET_CLOSED;
- }
- }
- else
- {
- std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
- << chunk->_data.size()
- << " queue: " << _chunks.size() << "\n";
- if (len > 0)
- {
- chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
- }
+ << " queue: " << _chunks.size() << "\n";
+ if (len > 0)
+ chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
- if (chunk->_data.size() == 0)
- _chunks.erase(_chunks.begin(), _chunks.begin() + 1);
+ if (chunk->_data.size() == 0)
+ _chunks.erase(_chunks.begin(), _chunks.begin() + 1);
+ }
}
}
}
- // FIXME: ideally we could avoid polling & delay _closed state etc.
if (events & (POLLERR | POLLHUP | POLLNVAL))
{
std::cerr << "#" << getFD() << " error events: " << events << "\n";
- pushCloseChunk(true);
+ changeState(Closed);
}
- return HandleResult::CONTINUE;
+
+ if (_state == Closed)
+ return HandleResult::SOCKET_CLOSED;
+ else
+ return HandleResult::CONTINUE;
}
};
More information about the Libreoffice-commits
mailing list