[Libreoffice-commits] online.git: Branch 'private/mmeeks/delaysocket' - 2 commits - net/DelaySocket.cpp

Michael Meeks michael.meeks at collabora.com
Sat Apr 22 17:41:58 UTC 2017


 net/DelaySocket.cpp |  158 ++++++++++++++++++++++++++++------------------------
 1 file changed, 87 insertions(+), 71 deletions(-)

New commits:
commit 7b24ecd447bf6f9d8066a9c1547aab6e6f193502
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Sat Apr 22 18:37:38 2017 +0100

    DelaySocket: hide logging.

diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index d2b0f7db..29a7fc1d 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -11,6 +11,8 @@
 
 #include "net/DelaySocket.hpp"
 
+#define DELAY_LOG(X) std::cerr << X << "\n";
+
 class Delayer;
 
 // FIXME: TerminatingPoll ?
@@ -79,8 +81,8 @@ public:
             int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
                 (*_chunks.begin())->_sendTime - now).count();
             if (remainingMs < timeoutMaxMs)
-                std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs
-                          << "ms from " << timeoutMaxMs << "ms\n";
+                DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMs
+                          << "ms from " << timeoutMaxMs << "ms\n");
             timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
         }
 
@@ -116,8 +118,7 @@ public:
             shutdown();
             break;
         }
-        std::cerr << "#" << getFD() << " changed to state "
-                  << newState << "\n";
+        DELAY_LOG("#" << getFD() << " changed to state " << newState << "\n");
         _state = newState;
     }
 
@@ -138,8 +139,8 @@ public:
                 changeState(EofFlushWrites);
             else if (len >= 0)
             {
-                std::cerr << "#" << getFD() << " read " << len
-                      << " to queue: " << _chunks.size() << "\n";
+                DELAY_LOG("#" << getFD() << " read " << len
+                          << " to queue: " << _chunks.size() << "\n");
                 chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]);
                 if (_dest)
                     _dest->_chunks.push_back(chunk);
@@ -148,7 +149,7 @@ public:
             }
             else if (errno != EAGAIN && errno != EWOULDBLOCK)
             {
-                std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
+                DELAY_LOG("#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n");
                 changeState(Closed); // FIXME - propagate the error ?
             }
         }
@@ -166,7 +167,7 @@ public:
             {
                 if (chunk->_data.size() == 0)
                 { // delayed error or close
-                    std::cerr << "#" << getFD() << " handling delayed close\n";
+                    DELAY_LOG("#" << getFD() << " handling delayed close\n");
                     changeState(Closed);
                 }
                 else
@@ -180,21 +181,23 @@ public:
                     {
                         if (errno == EAGAIN || errno == EWOULDBLOCK)
                         {
-                            std::cerr << "#" << getFD() << " full - waiting for write\n";
+                            DELAY_LOG("#" << getFD() << " full - waiting for write\n");
                         }
                         else
                         {
-                            std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+                            DELAY_LOG("#" << getFD() << " failed onwards write "
+                                      << len << "bytes of "
                                       << chunk->_data.size()
-                                  << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
+                                      << " queue: " << _chunks.size() << " error "
+                                      << strerror(errno) << "\n");
                             changeState(Closed);
                         }
                     }
                     else
                     {
-                        std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
+                        DELAY_LOG("#" << getFD() << " written onwards " << len << "bytes of "
                                   << chunk->_data.size()
-                                  << " queue: " << _chunks.size() << "\n";
+                                  << " queue: " << _chunks.size() << "\n");
                         if (len > 0)
                             chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
 
@@ -207,7 +210,7 @@ public:
 
         if (events & (POLLERR | POLLHUP | POLLNVAL))
         {
-            std::cerr << "#" << getFD() << " error events: " << events << "\n";
+            DELAY_LOG("#" << getFD() << " error events: " << events << "\n");
             changeState(Closed);
         }
 
commit efced88935d88c534bef9d7f6ae6ad7539d7b51d
Author: Michael Meeks <michael.meeks at collabora.com>
Date:   Sat Apr 22 18:30:37 2017 +0100

    DelaySocket: working, and much cleaner / simpler.

diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index a27c618f..d2b0f7db 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -19,10 +19,11 @@ static SocketPoll DelayPoll("delay_poll");
 /// Reads from fd, delays that and then writes to _dest.
 class DelaySocket : public Socket {
     int _delayMs;
-    bool _closed;
-    bool _stopPoll;
-    bool _waitForWrite;
-    std::shared_ptr<DelaySocket> _dest;
+    enum State { ReadWrite,      // normal socket
+                 EofFlushWrites, // finish up writes and close
+                 Closed };
+    State _state;
+    std::shared_ptr<DelaySocket> _dest; // our writing twin.
 
     const size_t WindowSize = 64 * 1024;
 
@@ -43,8 +44,8 @@ class DelaySocket : public Socket {
     std::vector<std::shared_ptr<WriteChunk>> _chunks;
 public:
     DelaySocket(int delayMs, int fd) :
-        Socket (fd), _delayMs(delayMs), _closed(false),
-        _stopPoll(false), _waitForWrite(false)
+        Socket (fd), _delayMs(delayMs),
+        _state(ReadWrite)
 	{
 //        setSocketBufferSize(Socket::DefaultSendBufferSize);
 	}
@@ -90,16 +91,39 @@ public:
             return POLLIN;
     }
 
-    void pushCloseChunk(bool bErrorSocket)
+    void pushCloseChunk()
     {
-        // socket in error state ? don't keep polling it.
-        _stopPoll |= bErrorSocket;
         _chunks.push_back(std::make_shared<WriteChunk>(_delayMs));
     }
 
+    void changeState(State newState)
+    {
+        switch (newState)
+        {
+        case ReadWrite:
+            assert (false);
+            break;
+        case EofFlushWrites:
+            assert (_state == ReadWrite);
+            assert (_dest);
+            _dest->pushCloseChunk();
+            _dest = nullptr;
+            break;
+        case Closed:
+            if (_dest && _state == ReadWrite)
+                _dest->pushCloseChunk();
+            _dest = nullptr;
+            shutdown();
+            break;
+        }
+        std::cerr << "#" << getFD() << " changed to state "
+                  << newState << "\n";
+        _state = newState;
+    }
+
     HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override
     {
-        if (events & POLLIN)
+        if (_state == ReadWrite && (events & POLLIN))
         {
             auto chunk = std::make_shared<WriteChunk>(_delayMs);
 
@@ -110,19 +134,9 @@ public:
                 len = ::read(getFD(), buf, toRead);
             } while (len < 0 && errno == EINTR);
 
-            if (len == 0)
-            { // EOF.
-                if (_dest) // FIXME: cut and paste ...
-                {
-                    _dest->pushCloseChunk(false);
-                    _dest = nullptr;
-                }
-                std::cerr << "EOF on input\n";
-                shutdown();
-                return HandleResult::SOCKET_CLOSED;
-            }
-
-            if (len >= 0)
+            if (len == 0) // EOF.
+                changeState(EofFlushWrites);
+            else if (len >= 0)
             {
                 std::cerr << "#" << getFD() << " read " << len
                       << " to queue: " << _chunks.size() << "\n";
@@ -130,17 +144,21 @@ public:
                 if (_dest)
                     _dest->_chunks.push_back(chunk);
                 else
-                    std::cerr << "no destination for data\n";
+                    assert("no destination for data" && false);
             }
             else if (errno != EAGAIN && errno != EWOULDBLOCK)
             {
                 std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
-                pushCloseChunk(true);
+                changeState(Closed); // FIXME - propagate the error ?
             }
         }
 
-        // Write if we have delayed enough.
-        if (_chunks.size() > 0)
+        if (_chunks.size() == 0)
+        {
+            if (_state == EofFlushWrites)
+                changeState(Closed);
+        }
+        else // Write if we have delayed enough.
         {
             std::shared_ptr<WriteChunk> chunk = *_chunks.begin();
             if (std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -149,59 +167,54 @@ public:
                 if (chunk->_data.size() == 0)
                 { // delayed error or close
                     std::cerr << "#" << getFD() << " handling delayed close\n";
-                    _closed = true;
-                    _dest = nullptr;
-                    shutdown();
-                    return HandleResult::SOCKET_CLOSED;
+                    changeState(Closed);
                 }
-
-                ssize_t len;
-                do {
-                    len = ::write(getFD(), &chunk->_data[0], chunk->_data.size());
-                } while (len < 0 && errno == EINTR);
-
-                if (len < 0)
+                else
                 {
-                    if (errno == EAGAIN || errno == EWOULDBLOCK)
+                    ssize_t len;
+                    do {
+                        len = ::write(getFD(), &chunk->_data[0], chunk->_data.size());
+                    } while (len < 0 && errno == EINTR);
+
+                    if (len < 0)
                     {
-                        std::cerr << "#" << getFD() << " full - waiting for write\n";
+                        if (errno == EAGAIN || errno == EWOULDBLOCK)
+                        {
+                            std::cerr << "#" << getFD() << " full - waiting for write\n";
+                        }
+                        else
+                        {
+                            std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+                                      << chunk->_data.size()
+                                  << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
+                            changeState(Closed);
+                        }
                     }
                     else
                     {
-                        std::cerr << "#" << getFD() << " failed onwards write " << len << "bytes of "
+                        std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
                                   << chunk->_data.size()
-                                  << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
-                        // URGH - cut and paste ...
-                        _closed = true;
-                        _dest = nullptr;
-                        // FIXME: tell dest we're dead ... [!] ...
-                        shutdown();
-                        return HandleResult::SOCKET_CLOSED;
-                    }
-                }
-                else
-                {
-                    std::cerr << "#" << getFD() << " written onwards " << len << "bytes of "
-                              << chunk->_data.size()
-                              << " queue: " << _chunks.size() << "\n";
-                    if (len > 0)
-                    {
-                        chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
-                    }
+                                  << " queue: " << _chunks.size() << "\n";
+                        if (len > 0)
+                            chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
 
-                    if (chunk->_data.size() == 0)
-                        _chunks.erase(_chunks.begin(), _chunks.begin() + 1);
+                        if (chunk->_data.size() == 0)
+                            _chunks.erase(_chunks.begin(), _chunks.begin() + 1);
+                    }
                 }
             }
         }
 
-        // FIXME: ideally we could avoid polling & delay _closed state etc.
         if (events & (POLLERR | POLLHUP | POLLNVAL))
         {
             std::cerr << "#" << getFD() << " error events: " << events << "\n";
-            pushCloseChunk(true);
+            changeState(Closed);
         }
-        return HandleResult::CONTINUE;
+
+        if (_state == Closed)
+            return HandleResult::SOCKET_CLOSED;
+        else
+            return HandleResult::CONTINUE;
     }
 };
 


More information about the Libreoffice-commits mailing list