[Libreoffice-commits] online.git: 5 commits - Makefile.am net/DelaySocket.cpp net/DelaySocket.hpp net/Socket.hpp wsd/LOOLWSD.cpp

Michael Meeks michael.meeks at collabora.com
Sat Apr 22 20:23:02 UTC 2017


 Makefile.am         |    2 
 net/DelaySocket.cpp |  261 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 net/DelaySocket.hpp |   27 +++++
 net/Socket.hpp      |   17 ++-
 wsd/LOOLWSD.cpp     |   29 +++++
 5 files changed, 329 insertions(+), 7 deletions(-)

New commits:
commit 8ef8e6737f74fafa1f098cba5163cf705fbae107
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Sat Apr 22 18:42:06 2017 +0100

    DelaySocket - disable latency for now.

diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index f1bb28ea..15391c4c 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -169,7 +169,7 @@ int MasterPortNumber = DEFAULT_MASTER_PORT_NUMBER;
 static bool DisplayVersion = false;
 
 /// Funky latency simulation basic delay (ms)
-static int SimulatedLatencyMs = 150;
+static int SimulatedLatencyMs = 0; // 150;
 
 // Tracks the set of prisoners / children waiting to be used.
 static std::mutex NewChildrenMutex;
commit 5d0e4fa0201be45ac9b8eb6c479c02ca2e020fab
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Sat Apr 22 18:37:38 2017 +0100

    DelaySocket: hide logging.

diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index d2b0f7db..29a7fc1d 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -11,6 +11,8 @@
 
 #include "net/DelaySocket.hpp"
 
+#define DELAY_LOG(X) std::cerr << X << "\n";
+
 class Delayer;
 
 // FIXME: TerminatingPoll ?
@@ -79,8 +81,8 @@ public:
             int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
                 (*_chunks.begin())->_sendTime - now).count();
             if (remainingMs < timeoutMaxMs)
-                std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs
-                          << "ms from " << timeoutMaxMs << "ms\n";
+                DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMs
+                          << "ms from " << timeoutMaxMs << "ms\n");
             timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
         }
 
@@ -116,8 +118,7 @@ public:
             shutdown();
             break;
         }
-        std::cerr << "#" << getFD() << " changed to state "
-                  << newState << "\n";
+        DELAY_LOG("#" << getFD() << " changed to state " << newState << "\n");
         _state = newState;
     }
 
@@ -138,8 +139,8 @@ public:
                 changeState(EofFlushWrites);
             else if (len >= 0)
             {
-                std::cerr << "#" << getFD() << " read " << len
-                      << " to queue: " << _chunks.size() << "\n";
+                DELAY_LOG("#" << getFD() << " read " << len
+                          << " to queue: " << _chunks.size() << "\n");
                 chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]);
                 if (_dest)
                     _dest->_chunks.push_back(chunk);
@@ -148,7 +149,7 @@ public:
             }
             else if (errno != EAGAIN && errno != EWOULDBLOCK)
             {
-                std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
+                DELAY_LOG("#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n");
                 changeState(Closed); // FIXME - propagate the error ?
             }
         }
@@ -166,7 +167,7 @@ public:
             {
                 if (chunk->_data.size() == 0)
                 { // delayed error or close
-                    std::cerr << "#" << getFD() << " handling delayed close\n";
+                    DELAY_LOG("#" << getFD() << " handling delayed close\n");
                     changeState(Closed);
                 }
                 else
@@ -180,21 +181,23 @@ public:
                     {
                         if (errno == EAGAIN || errno == EWOULDBLOCK)
                         {
-                            std::cerr << "#" << getFD() << " full - waiting for write\n";
+                            DELAY_LOG("#" << getFD() << " full - waiting for write\n");
                         }
                         else
                         {
-                            std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+                            DELAY_LOG("#" << getFD() << " failed onwards write "
+                                      << len << "bytes of "
                                       << chunk->_data.size()
-                                  << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
+                                      << " queue: " << _chunks.size() << " error "
+                                      << strerror(errno) << "\n");
                             changeState(Closed);
                         }
                     }
                     else
                     {
-                        std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
+                        DELAY_LOG("#" << getFD() << " written onwards " << len << "bytes of "
                                   << chunk->_data.size()
-                                  << " queue: " << _chunks.size() << "\n";
+                                  << " queue: " << _chunks.size() << "\n");
                         if (len > 0)
                             chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
 
@@ -207,7 +210,7 @@ public:
 
         if (events & (POLLERR | POLLHUP | POLLNVAL))
         {
-            std::cerr << "#" << getFD() << " error events: " << events << "\n";
+            DELAY_LOG("#" << getFD() << " error events: " << events << "\n");
             changeState(Closed);
         }
 
commit 63aba6be1ad3ad9120f7ff78fd8de4887ac1365b
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Sat Apr 22 18:30:37 2017 +0100

    DelaySocket: working, and much cleaner / simpler.

diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index a27c618f..d2b0f7db 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -19,10 +19,11 @@ static SocketPoll DelayPoll("delay_poll");
 /// Reads from fd, delays that and then writes to _dest.
 class DelaySocket : public Socket {
     int _delayMs;
-    bool _closed;
-    bool _stopPoll;
-    bool _waitForWrite;
-    std::shared_ptr<DelaySocket> _dest;
+    enum State { ReadWrite,      // normal socket
+                 EofFlushWrites, // finish up writes and close
+                 Closed };
+    State _state;
+    std::shared_ptr<DelaySocket> _dest; // our writing twin.
 
     const size_t WindowSize = 64 * 1024;
 
@@ -43,8 +44,8 @@ class DelaySocket : public Socket {
     std::vector<std::shared_ptr<WriteChunk>> _chunks;
 public:
     DelaySocket(int delayMs, int fd) :
-        Socket (fd), _delayMs(delayMs), _closed(false),
-        _stopPoll(false), _waitForWrite(false)
+        Socket (fd), _delayMs(delayMs),
+        _state(ReadWrite)
 	{
 //        setSocketBufferSize(Socket::DefaultSendBufferSize);
 	}
@@ -90,16 +91,39 @@ public:
             return POLLIN;
     }
 
-    void pushCloseChunk(bool bErrorSocket)
+    void pushCloseChunk()
     {
-        // socket in error state ? don't keep polling it.
-        _stopPoll |= bErrorSocket;
         _chunks.push_back(std::make_shared<WriteChunk>(_delayMs));
     }
 
+    void changeState(State newState)
+    {
+        switch (newState)
+        {
+        case ReadWrite:
+            assert (false);
+            break;
+        case EofFlushWrites:
+            assert (_state == ReadWrite);
+            assert (_dest);
+            _dest->pushCloseChunk();
+            _dest = nullptr;
+            break;
+        case Closed:
+            if (_dest && _state == ReadWrite)
+                _dest->pushCloseChunk();
+            _dest = nullptr;
+            shutdown();
+            break;
+        }
+        std::cerr << "#" << getFD() << " changed to state "
+                  << newState << "\n";
+        _state = newState;
+    }
+
     HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override
     {
-        if (events & POLLIN)
+        if (_state == ReadWrite && (events & POLLIN))
         {
             auto chunk = std::make_shared<WriteChunk>(_delayMs);
 
@@ -110,19 +134,9 @@ public:
                 len = ::read(getFD(), buf, toRead);
             } while (len < 0 && errno == EINTR);
 
-            if (len == 0)
-            { // EOF.
-                if (_dest) // FIXME: cut and paste ...
-                {
-                    _dest->pushCloseChunk(false);
-                    _dest = nullptr;
-                }
-                std::cerr << "EOF on input\n";
-                shutdown();
-                return HandleResult::SOCKET_CLOSED;
-            }
-
-            if (len >= 0)
+            if (len == 0) // EOF.
+                changeState(EofFlushWrites);
+            else if (len >= 0)
             {
                 std::cerr << "#" << getFD() << " read " << len
                       << " to queue: " << _chunks.size() << "\n";
@@ -130,17 +144,21 @@ public:
                 if (_dest)
                     _dest->_chunks.push_back(chunk);
                 else
-                    std::cerr << "no destination for data\n";
+                    assert("no destination for data" && false);
             }
             else if (errno != EAGAIN && errno != EWOULDBLOCK)
             {
                 std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
-                pushCloseChunk(true);
+                changeState(Closed); // FIXME - propagate the error ?
             }
         }
 
-        // Write if we have delayed enough.
-        if (_chunks.size() > 0)
+        if (_chunks.size() == 0)
+        {
+            if (_state == EofFlushWrites)
+                changeState(Closed);
+        }
+        else // Write if we have delayed enough.
         {
             std::shared_ptr<WriteChunk> chunk = *_chunks.begin();
             if (std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -149,59 +167,54 @@ public:
                 if (chunk->_data.size() == 0)
                 { // delayed error or close
                     std::cerr << "#" << getFD() << " handling delayed close\n";
-                    _closed = true;
-                    _dest = nullptr;
-                    shutdown();
-                    return HandleResult::SOCKET_CLOSED;
+                    changeState(Closed);
                 }
-
-                ssize_t len;
-                do {
-                    len = ::write(getFD(), &chunk->_data[0], chunk->_data.size());
-                } while (len < 0 && errno == EINTR);
-
-                if (len < 0)
+                else
                 {
-                    if (errno == EAGAIN || errno == EWOULDBLOCK)
+                    ssize_t len;
+                    do {
+                        len = ::write(getFD(), &chunk->_data[0], chunk->_data.size());
+                    } while (len < 0 && errno == EINTR);
+
+                    if (len < 0)
                     {
-                        std::cerr << "#" << getFD() << " full - waiting for write\n";
+                        if (errno == EAGAIN || errno == EWOULDBLOCK)
+                        {
+                            std::cerr << "#" << getFD() << " full - waiting for write\n";
+                        }
+                        else
+                        {
+                            std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+                                      << chunk->_data.size()
+                                  << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
+                            changeState(Closed);
+                        }
                     }
                     else
                     {
-                        std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+                        std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
                                   << chunk->_data.size()
-                                  << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
-                        // URGH - cut and paste ...
-                        _closed = true;
-                        _dest = nullptr;
-                        // FIXME: tell dest we're dead ... [!] ...
-                        shutdown();
-                        return HandleResult::SOCKET_CLOSED;
-                    }
-                }
-                else
-                {
-                    std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
-                              << chunk->_data.size()
-                              << " queue: " << _chunks.size() << "\n";
-                    if (len > 0)
-                    {
-                        chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
-                    }
+                                  << " queue: " << _chunks.size() << "\n";
+                        if (len > 0)
+                            chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
 
-                    if (chunk->_data.size() == 0)
-                        _chunks.erase(_chunks.begin(), _chunks.begin() + 1);
+                        if (chunk->_data.size() == 0)
+                            _chunks.erase(_chunks.begin(), _chunks.begin() + 1);
+                    }
                 }
             }
         }
 
-        // FIXME: ideally we could avoid polling & delay _closed state etc.
         if (events & (POLLERR | POLLHUP | POLLNVAL))
         {
             std::cerr << "#" << getFD() << " error events: " << events << "\n";
-            pushCloseChunk(true);
+            changeState(Closed);
         }
-        return HandleResult::CONTINUE;
+
+        if (_state == Closed)
+            return HandleResult::SOCKET_CLOSED;
+        else
+            return HandleResult::CONTINUE;
     }
 };
 
commit 26a6d514d58f8aa3255c6a424b8e27af66416bdb
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Fri Apr 21 22:37:55 2017 +0100

    DelaySocket - simplified but working.
    
    Assume for now that we have infinite space in-transit.

diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index 52539e7d..a27c618f 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -22,34 +22,33 @@ class DelaySocket : public Socket {
     bool _closed;
     bool _stopPoll;
     bool _waitForWrite;
-    std::weak_ptr<DelaySocket> _dest;
+    std::shared_ptr<DelaySocket> _dest;
 
     const size_t WindowSize = 64 * 1024;
 
-    struct DelayChunk {
+    /// queued up data - sent to us by our opposite twin.
+    struct WriteChunk {
         std::chrono::steady_clock::time_point _sendTime;
         std::vector<char> _data;
-        DelayChunk(int delayMs)
+        WriteChunk(int delayMs)
         {
             _sendTime = std::chrono::steady_clock::now() +
                 std::chrono::milliseconds(delayMs);
         }
         bool isError() { return _data.size() == 0; }
     private:
-        DelayChunk();
+        WriteChunk();
     };
 
-    size_t _chunksSize;
-    std::vector<std::shared_ptr<DelayChunk>> _chunks;
+    std::vector<std::shared_ptr<WriteChunk>> _chunks;
 public:
     DelaySocket(int delayMs, int fd) :
         Socket (fd), _delayMs(delayMs), _closed(false),
-        _stopPoll(false), _waitForWrite(false),
-        _chunksSize(0)
+        _stopPoll(false), _waitForWrite(false)
 	{
 //        setSocketBufferSize(Socket::DefaultSendBufferSize);
 	}
-    void setDestination(const std::weak_ptr<DelaySocket> &dest)
+    void setDestination(const std::shared_ptr<DelaySocket> &dest)
     {
         _dest = dest;
     }
@@ -74,71 +73,64 @@ public:
     int getPollEvents(std::chrono::steady_clock::time_point now,
                       int &timeoutMaxMs) override
     {
-        auto dest = _dest.lock();
-
-        bool bOtherIsWriteBlocked = !dest || dest->_waitForWrite;
-        bool bWeAreReadBlocked = _chunksSize >= WindowSize;
-
-        if (_chunks.size() > 0 && (!bOtherIsWriteBlocked || !bWeAreReadBlocked))
+        if (_chunks.size() > 0)
         {
             int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
                 (*_chunks.begin())->_sendTime - now).count();
             if (remainingMs < timeoutMaxMs)
                 std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs
-                          << "ms from " << timeoutMaxMs << "ms "
-                          << "owb: " << bOtherIsWriteBlocked << " rb: "
-                          << bWeAreReadBlocked << "\n";
+                          << "ms from " << timeoutMaxMs << "ms\n";
             timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
         }
 
-        if (_stopPoll)
-            return -1;
-
-        int events = 0;
-
-        if (!bWeAreReadBlocked)
-            events |= POLLIN;
-
-        // NB. controlled by the other socket.
-        if (_waitForWrite)
-            events |= POLLOUT;
-
-        return events;
+        if (_chunks.size() > 0 &&
+            now > (*_chunks.begin())->_sendTime)
+            return POLLIN | POLLOUT;
+        else
+            return POLLIN;
     }
 
     void pushCloseChunk(bool bErrorSocket)
     {
         // socket in error state ? don't keep polling it.
         _stopPoll |= bErrorSocket;
-        _chunks.push_back(std::make_shared<DelayChunk>(_delayMs));
+        _chunks.push_back(std::make_shared<WriteChunk>(_delayMs));
     }
 
     HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override
     {
-        auto dest = _dest.lock();
-
         if (events & POLLIN)
         {
-            auto chunk = std::make_shared<DelayChunk>(_delayMs);
+            auto chunk = std::make_shared<WriteChunk>(_delayMs);
 
             char buf[64 * 1024];
             ssize_t len;
-            size_t toRead = std::min(sizeof(buf), WindowSize - _chunksSize);
-            if (_closed)
-            { // get last data before async close
-                toRead = sizeof (buf);
-            }
+            size_t toRead = sizeof(buf); //std::min(sizeof(buf), WindowSize - _chunksSize);
             do {
                 len = ::read(getFD(), buf, toRead);
             } while (len < 0 && errno == EINTR);
 
+            if (len == 0)
+            { // EOF.
+                if (_dest) // FIXME: cut and paste ...
+                {
+                    _dest->pushCloseChunk(false);
+                    _dest = nullptr;
+                }
+                std::cerr << "EOF on input\n";
+                shutdown();
+                return HandleResult::SOCKET_CLOSED;
+            }
+
             if (len >= 0)
             {
                 std::cerr << "#" << getFD() << " read " << len
                       << " to queue: " << _chunks.size() << "\n";
                 chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]);
-                _chunksSize += len;
-                _chunks.push_back(chunk);
+                if (_dest)
+                    _dest->_chunks.push_back(chunk);
+                else
+                    std::cerr << "no destination for data\n";
             }
             else if (errno != EAGAIN && errno != EWOULDBLOCK)
             {
@@ -147,65 +139,54 @@ public:
             }
         }
 
-        if (_closed)
-        {
-            std::cerr << "#" << getFD() << " closing\n";
-            dumpState(std::cerr);
-            if (dest)
-            {
-                std::cerr << "\t#" << dest->getFD() << " closing linked\n";
-                dest->dumpState(std::cerr);
-                dest->pushCloseChunk(false);
-                _dest.reset();
-            }
-            return HandleResult::SOCKET_CLOSED;
-        }
-
         // Write if we have delayed enough.
-        if (dest && _chunks.size() > 0)
+        if (_chunks.size() > 0)
         {
-            std::shared_ptr<DelayChunk> chunk = *_chunks.begin();
+            std::shared_ptr<WriteChunk> chunk = *_chunks.begin();
             if (std::chrono::duration_cast<std::chrono::milliseconds>(
                     now - chunk->_sendTime).count() >= 0)
             {
-                dest->_waitForWrite = false;
-
                 if (chunk->_data.size() == 0)
                 { // delayed error or close
-                    std::cerr << "#" << getFD() << " handling delayed close with " << _chunksSize << "bytes left\n";
+                    std::cerr << "#" << getFD() << " handling delayed close\n";
                     _closed = true;
-                    return HandleResult::CONTINUE;
+                    _dest = nullptr;
+                    shutdown();
+                    return HandleResult::SOCKET_CLOSED;
                 }
 
                 ssize_t len;
                 do {
-                    len = ::write(dest->getFD(), &chunk->_data[0], chunk->_data.size());
+                    len = ::write(getFD(), &chunk->_data[0], chunk->_data.size());
                 } while (len < 0 && errno == EINTR);
 
                 if (len < 0)
                 {
                     if (errno == EAGAIN || errno == EWOULDBLOCK)
                     {
-                        dest->_waitForWrite = true;
-                        std::cerr << "#" << dest->getFD() << " full - waiting for write ultimately from fd #" << getFD() << "\n";
+                        std::cerr << "#" << getFD() << " full - waiting for write\n";
                     }
                     else
                     {
-                        std::cerr << "#" << dest->getFD() << " failed onwards write " << len << "bytes of "
-                                  << chunk->_data.size() << " ultimately from fd #" << getFD()
+                        std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+                                  << chunk->_data.size()
                                   << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
-                        dest->pushCloseChunk(false);
+                        // URGH - cut and paste ...
+                        _closed = true;
+                        _dest = nullptr;
+                        // FIXME: tell dest we're dead ... [!] ...
+                        shutdown();
+                        return HandleResult::SOCKET_CLOSED;
                     }
                 }
                 else
                 {
-                    std::cerr << "#" << dest->getFD() << " written onwards " << len << "bytes of "
-                              << chunk->_data.size() << " ultimately from fd #" << getFD()
+                    std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
+                              << chunk->_data.size()
                               << " queue: " << _chunks.size() << "\n";
                     if (len > 0)
                     {
                         chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
-                        _chunksSize -= len;
                     }
 
                     if (chunk->_data.size() == 0)
commit 1c7f94045a433625f9ec55b137bbd0322f27d532
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Thu Mar 23 17:14:51 2017 +0000

    Initial DelaySocket goodness.

diff --git a/Makefile.am b/Makefile.am
index a348f3f4..c72ad4d0 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -53,6 +53,7 @@ shared_sources = common/FileUtil.cpp \
                  common/Unit.cpp \
                  common/UnitHTTP.cpp \
                  common/Util.cpp \
+                 net/DelaySocket.cpp \
                  net/Socket.cpp
 if ENABLE_SSL
 shared_sources += net/Ssl.cpp
@@ -162,6 +163,7 @@ shared_headers = common/Common.hpp \
                  common/SigUtil.hpp \
                  common/security.h \
                  common/SpookyV2.h \
+                 net/DelaySocket.hpp \
                  net/ServerSocket.hpp \
                  net/Socket.hpp \
                  net/WebSocketHandler.hpp \
diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
new file mode 100644
index 00000000..52539e7d
--- /dev/null
+++ b/net/DelaySocket.cpp
@@ -0,0 +1,264 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include "config.h"
+
+#include "net/DelaySocket.hpp"
+
+class Delayer;
+
+// FIXME: TerminatingPoll ?
+static SocketPoll DelayPoll("delay_poll");
+
+/// Reads from fd, delays that and then writes to _dest.
+class DelaySocket : public Socket {
+    int _delayMs;
+    bool _closed;
+    bool _stopPoll;
+    bool _waitForWrite;
+    std::weak_ptr<DelaySocket> _dest;
+
+    const size_t WindowSize = 64 * 1024;
+
+    struct DelayChunk {
+        std::chrono::steady_clock::time_point _sendTime;
+        std::vector<char> _data;
+        DelayChunk(int delayMs)
+        {
+            _sendTime = std::chrono::steady_clock::now() +
+                std::chrono::milliseconds(delayMs);
+        }
+        bool isError() { return _data.size() == 0; }
+    private:
+        DelayChunk();
+    };
+
+    size_t _chunksSize;
+    std::vector<std::shared_ptr<DelayChunk>> _chunks;
+public:
+    DelaySocket(int delayMs, int fd) :
+        Socket (fd), _delayMs(delayMs), _closed(false),
+        _stopPoll(false), _waitForWrite(false),
+        _chunksSize(0)
+	{
+//        setSocketBufferSize(Socket::DefaultSendBufferSize);
+	}
+    void setDestination(const std::weak_ptr<DelaySocket> &dest)
+    {
+        _dest = dest;
+    }
+
+    void dumpState(std::ostream& os) override
+    {
+        os << "\tfd: " << getFD()
+           << "\n\tqueue: " << _chunks.size() << "\n";
+        auto now = std::chrono::steady_clock::now();
+        for (auto &chunk : _chunks)
+        {
+            os << "\t\tin: " <<
+                std::chrono::duration_cast<std::chrono::milliseconds>(
+                    chunk->_sendTime - now).count() << "ms - "
+               << chunk->_data.size() << "bytes\n";
+        }
+    }
+
+    // FIXME - really need to propagate 'noDelay' etc.
+    // have a debug only lookup of delayed sockets for this case ?
+
+    int getPollEvents(std::chrono::steady_clock::time_point now,
+                      int &timeoutMaxMs) override
+    {
+        auto dest = _dest.lock();
+
+        bool bOtherIsWriteBlocked = !dest || dest->_waitForWrite;
+        bool bWeAreReadBlocked = _chunksSize >= WindowSize;
+
+        if (_chunks.size() > 0 && (!bOtherIsWriteBlocked || !bWeAreReadBlocked))
+        {
+            int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
+                (*_chunks.begin())->_sendTime - now).count();
+            if (remainingMs < timeoutMaxMs)
+                std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs
+                          << "ms from " << timeoutMaxMs << "ms "
+                          << "owb: " << bOtherIsWriteBlocked << " rb: "
+                          << bWeAreReadBlocked << "\n";
+            timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
+        }
+
+        if (_stopPoll)
+            return -1;
+
+        int events = 0;
+
+        if (!bWeAreReadBlocked)
+            events |= POLLIN;
+
+        // NB. controlled by the other socket.
+        if (_waitForWrite)
+            events |= POLLOUT;
+
+        return events;
+    }
+
+    void pushCloseChunk(bool bErrorSocket)
+    {
+        // socket in error state ? don't keep polling it.
+        _stopPoll |= bErrorSocket;
+        _chunks.push_back(std::make_shared<DelayChunk>(_delayMs));
+    }
+
+    HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override
+    {
+        auto dest = _dest.lock();
+
+        if (events & POLLIN)
+        {
+            auto chunk = std::make_shared<DelayChunk>(_delayMs);
+
+            char buf[64 * 1024];
+            ssize_t len;
+            size_t toRead = std::min(sizeof(buf), WindowSize - _chunksSize);
+            if (_closed)
+            { // get last data before async close
+                toRead = sizeof (buf);
+            }
+            do {
+                len = ::read(getFD(), buf, toRead);
+            } while (len < 0 && errno == EINTR);
+
+            if (len >= 0)
+            {
+                std::cerr << "#" << getFD() << " read " << len
+                      << " to queue: " << _chunks.size() << "\n";
+                chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]);
+                _chunksSize += len;
+                _chunks.push_back(chunk);
+            }
+            else if (errno != EAGAIN && errno != EWOULDBLOCK)
+            {
+                std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
+                pushCloseChunk(true);
+            }
+        }
+
+        if (_closed)
+        {
+            std::cerr << "#" << getFD() << " closing\n";
+            dumpState(std::cerr);
+            if (dest)
+            {
+                std::cerr << "\t#" << dest->getFD() << " closing linked\n";
+                dest->dumpState(std::cerr);
+                dest->pushCloseChunk(false);
+                _dest.reset();
+            }
+            return HandleResult::SOCKET_CLOSED;
+        }
+
+        // Write if we have delayed enough.
+        if (dest && _chunks.size() > 0)
+        {
+            std::shared_ptr<DelayChunk> chunk = *_chunks.begin();
+            if (std::chrono::duration_cast<std::chrono::milliseconds>(
+                    now - chunk->_sendTime).count() >= 0)
+            {
+                dest->_waitForWrite = false;
+
+                if (chunk->_data.size() == 0)
+                { // delayed error or close
+                    std::cerr << "#" << getFD() << " handling delayed close with " << _chunksSize << "bytes left\n";
+                    _closed = true;
+                    return HandleResult::CONTINUE;
+                }
+
+                ssize_t len;
+                do {
+                    len = ::write(dest->getFD(), &chunk->_data[0], chunk->_data.size());
+                } while (len < 0 && errno == EINTR);
+
+                if (len < 0)
+                {
+                    if (errno == EAGAIN || errno == EWOULDBLOCK)
+                    {
+                        dest->_waitForWrite = true;
+                        std::cerr << "#" << dest->getFD() << " full - waiting for write ultimately from fd #" << getFD() << "\n";
+                    }
+                    else
+                    {
+                        std::cerr << "#" << dest->getFD() << " failed onwards write " << len << "bytes of "
+                                  << chunk->_data.size() << " ultimately from fd #" << getFD()
+                                  << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
+                        dest->pushCloseChunk(false);
+                    }
+                }
+                else
+                {
+                    std::cerr << "#" << dest->getFD() << " written onwards " << len << "bytes of "
+                              << chunk->_data.size() << " ultimately from fd #" << getFD()
+                              << " queue: " << _chunks.size() << "\n";
+                    if (len > 0)
+                    {
+                        chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
+                        _chunksSize -= len;
+                    }
+
+                    if (chunk->_data.size() == 0)
+                        _chunks.erase(_chunks.begin(), _chunks.begin() + 1);
+                }
+            }
+        }
+
+        // FIXME: ideally we could avoid polling & delay _closed state etc.
+        if (events & (POLLERR | POLLHUP | POLLNVAL))
+        {
+            std::cerr << "#" << getFD() << " error events: " << events << "\n";
+            pushCloseChunk(true);
+        }
+        return HandleResult::CONTINUE;
+    }
+};
+
+/// Delayer:
+///
+/// Some terminology:
+///    physical socket (DelaySocket's own fd) - what we accepted.
+///    internalFd - the internal side of the socket-pair
+///    delayFd - what we hand on to our un-suspecting wrapped socket
+///              which looks like an external socket - but delayed.
+namespace Delay {
+    int create(int delayMs, int physicalFd)
+    {
+        int pair[2];
+        int rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0, pair);
+        assert (rc == 0);
+        int internalFd = pair[0];
+        int delayFd = pair[1];
+
+        auto physical = std::make_shared<DelaySocket>(delayMs, physicalFd);
+        auto internal = std::make_shared<DelaySocket>(delayMs, internalFd);
+        physical->setDestination(internal);
+        internal->setDestination(physical);
+
+        DelayPoll.startThread();
+        DelayPoll.insertNewSocket(physical);
+        DelayPoll.insertNewSocket(internal);
+
+        return delayFd;
+    }
+    void dumpState(std::ostream &os)
+    {
+        if (DelayPoll.isAlive())
+        {
+            os << "Delay poll:\n";
+            DelayPoll.dumpState(os);
+        }
+    }
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/net/DelaySocket.hpp b/net/DelaySocket.hpp
new file mode 100644
index 00000000..df423b52
--- /dev/null
+++ b/net/DelaySocket.hpp
@@ -0,0 +1,27 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef INCLUDED_DELAY_SOCKET_HPP
+#define INCLUDED_DELAY_SOCKET_HPP
+
+#include <Socket.hpp>
+
+/// Simulates network latency for local debugging.
+///
+/// We are lifecycle managed internally based on the physical /
+/// delayFd lifecycle.
+namespace Delay
+{
+    int create(int delayMs, int physicalFd);
+    void dumpState(std::ostream &os);
+};
+
+#endif
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/net/Socket.hpp b/net/Socket.hpp
index 8c07e38d..0e616ddf 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -84,7 +84,7 @@ public:
     virtual HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) = 0;
 
     /// manage latency issues around packet aggregation
-    void setNoDelay()
+    virtual void setNoDelay()
     {
         const int val = 1;
         setsockopt (_fd, IPPROTO_TCP, TCP_NODELAY,
@@ -182,7 +182,7 @@ public:
     virtual void dumpState(std::ostream&) {}
 
     /// Set the thread-id we're bound to
-    void setThreadOwner(const std::thread::id &id)
+    virtual void setThreadOwner(const std::thread::id &id)
     {
         if (id != _owner)
         {
@@ -518,8 +518,17 @@ private:
 
         for (size_t i = 0; i < size; ++i)
         {
-            _pollFds[i].fd = _pollSockets[i]->getFD();
-            _pollFds[i].events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs);
+            int events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs);
+            if (events < 0) // timeout on dead socket
+            {
+                _pollFds[i].fd = _wakeup[0];
+                _pollFds[i].events = 0;
+            }
+            else
+            {
+                _pollFds[i].fd = _pollSockets[i]->getFD();
+                _pollFds[i].events = events;
+            }
             _pollFds[i].revents = 0;
         }
 
diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp
index a0529380..f1bb28ea 100644
--- a/wsd/LOOLWSD.cpp
+++ b/wsd/LOOLWSD.cpp
@@ -100,6 +100,7 @@
 #if ENABLE_SSL
 #  include "SslSocket.hpp"
 #endif
+#include "DelaySocket.hpp"
 #include "Storage.hpp"
 #include "TraceFile.hpp"
 #include "Unit.hpp"
@@ -167,6 +168,9 @@ int MasterPortNumber = DEFAULT_MASTER_PORT_NUMBER;
 //TODO: Move to a more sensible namespace.
 static bool DisplayVersion = false;
 
+/// Funky latency simulation basic delay (ms)
+static int SimulatedLatencyMs = 150;
+
 // Tracks the set of prisoners / children waiting to be used.
 static std::mutex NewChildrenMutex;
 static std::condition_variable NewChildrenCV;
@@ -2113,17 +2117,32 @@ private:
 
 class PlainSocketFactory : public SocketFactory
 {
-    std::shared_ptr<Socket> create(const int fd) override
+    std::shared_ptr<Socket> create(const int physicalFd) override
     {
-        return StreamSocket::create<StreamSocket>(fd, std::unique_ptr<SocketHandlerInterface>{ new ClientRequestDispatcher });
+        int fd = physicalFd;
+
+        if (SimulatedLatencyMs > 0)
+            fd = Delay::create(SimulatedLatencyMs, physicalFd);
+
+        std::shared_ptr<Socket> socket =
+            StreamSocket::create<StreamSocket>(
+                fd, std::unique_ptr<SocketHandlerInterface>{
+                    new ClientRequestDispatcher });
+
+        return socket;
     }
 };
 
 #if ENABLE_SSL
 class SslSocketFactory : public SocketFactory
 {
-    std::shared_ptr<Socket> create(const int fd) override
+    std::shared_ptr<Socket> create(const int physicalFd) override
     {
+        int fd = physicalFd;
+
+        if (SimulatedLatencyMs > 0)
+            fd = Delay::create(SimulatedLatencyMs, physicalFd);
+
         return StreamSocket::create<SslStreamSocket>(fd, std::unique_ptr<SocketHandlerInterface>{ new ClientRequestDispatcher });
     }
 };
@@ -2133,6 +2152,7 @@ class PrisonerSocketFactory : public SocketFactory
 {
     std::shared_ptr<Socket> create(const int fd) override
     {
+        // No local delay.
         return StreamSocket::create<StreamSocket>(fd, std::unique_ptr<SocketHandlerInterface>{ new PrisonerRequestDispatcher });
     }
 };
@@ -2207,6 +2227,9 @@ public:
         os << "Admin poll:\n";
         Admin::instance().dumpState(os);
 
+        // If we have any delaying work going on.
+        Delay::dumpState(os);
+
         os << "Document Broker polls "
                   << "[ " << DocBrokers.size() << " ]:\n";
         for (auto &i : DocBrokers)


More information about the Libreoffice-commits mailing list