[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/clientnb.cpp net/loolnb.cpp net/socket.hpp
Michael Meeks
michael.meeks at collabora.com
Tue Feb 14 23:47:49 UTC 2017
net/clientnb.cpp | 3 -
net/loolnb.cpp | 125 +++++++++++++++++++------------------------------------
net/socket.hpp | 55 ++++++++++++++++++------
3 files changed, 88 insertions(+), 95 deletions(-)
New commits:
commit ded9607faf8e3e1fa122b200353a3465c94347ae
Author: Michael Meeks <michael.meeks at collabora.com>
Date: Tue Feb 14 23:45:24 2017 +0000
Implement basic buffering.
The socket now buffers input, and output, updates its poll record too.
We pass a simple message from client to server and back using lamers HTTP.
Sub-classed ClientSocket to provide a simple message handler.
not very convinced by templatization here, but made it consistent.
more ideal to have some virtual socket pieces.
diff --git a/net/clientnb.cpp b/net/clientnb.cpp
index e4a7fc3..ec7c578 100644
--- a/net/clientnb.cpp
+++ b/net/clientnb.cpp
@@ -82,7 +82,8 @@ public:
std::cerr << "try to get response\n";
std::istream& responseStream = session->receiveResponse(response);
- std::cerr << "Got response '" << responseStream << "'\n";
+ std::string result(std::istreambuf_iterator<char>(responseStream), {});
+ std::cerr << "Got response '" << result << "'\n";
}
catch (const Poco::Exception &e)
{
diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index 952c44a..d457471 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -24,6 +24,34 @@
constexpr int PortNumber = 9191;
+class SimpleResponseClient : public ClientSocket
+{
+public:
+ SimpleResponseClient(const int fd) :
+ ClientSocket(fd)
+ {
+ }
+ virtual void handleIncomingMessage() override
+ {
+ std::cerr << "message had size " << _inBuffer.size() << "\n";
+ std::ostringstream oss;
+ oss << "HTTP/1.1 200 OK\r\n"
+ << "Date: Once, Upon a time GMT\r\n" // Mon, 27 Jul 2009 12:28:53 GMT
+ << "Server: madeup string (Linux)\r\n"
+ << "Content-Length: " << _inBuffer.size() << "\r\n"
+ << "Content-Type: text/plain\r\n"
+ << "Connection: Closed\r\n"
+ << "\r\n"
+ ;
+ std::string str = oss.str();
+ _outBuffer.insert(_outBuffer.end(), str.begin(), str.end());
+
+ // append the content we got:
+ _outBuffer.insert(_outBuffer.end(), _inBuffer.begin(), _inBuffer.end());
+ _inBuffer.clear();
+ }
+};
+
/// Handles non-blocking socket event polling.
/// Only polls on N-Sockets and invokes callback and
/// doesn't manage buffers or client data.
@@ -102,7 +130,7 @@ public:
/// Insert a new socket to be polled.
/// Sockets are removed only when the handler return false.
- void insertNewSocket(const std::shared_ptr<ClientSocket>& newSocket)
+ void insertNewSocket(const std::shared_ptr<T>& newSocket)
{
std::lock_guard<std::mutex> lock(_mutex);
@@ -138,8 +166,7 @@ private:
for (size_t i = 0; i < size; ++i)
{
_pollFds[i].fd = _pollSockets[i]->getFD();
- //TODO: Get from the socket.
- _pollFds[i].events = POLLIN | POLLOUT;
+ _pollFds[i].events = _pollSockets[i]->getPollEvents();
_pollFds[i].revents = 0;
}
@@ -153,10 +180,10 @@ private:
/// main-loop wakeup pipe
int _wakeup[2];
/// The sockets we're controlling
- std::vector<std::shared_ptr<ClientSocket>> _pollSockets;
+ std::vector<std::shared_ptr<T>> _pollSockets;
/// Protects _newSockets
std::mutex _mutex;
- std::vector<std::shared_ptr<ClientSocket>> _newSockets;
+ std::vector<std::shared_ptr<T>> _newSockets;
/// The fds to poll.
std::vector<pollfd> _pollFds;
};
@@ -197,39 +224,7 @@ private:
Poco::Net::SocketAddress addr("127.0.0.1", PortNumber);
-void client(const int timeoutMs)
-{
- const auto client = std::make_shared<ClientSocket>();
- if (!client->connect(addr, timeoutMs) && errno != EINPROGRESS)
- {
- const std::string msg = "Failed to call connect. (errno: ";
- throw std::runtime_error(msg + std::strerror(errno) + ")");
- }
-
- std::cout << "Connected " << client->getFD() << std::endl;
-
- client->send("1", 1);
- int sent = 1;
- while (sent > 0 && client->pollRead(5000))
- {
- char buf[1024];
- const int recv = client->recv(buf, sizeof(buf));
- if (recv <= 0)
- {
- perror("recv");
- break;
- }
- else
- {
- const std::string msg = std::string(buf, recv);
- const int num = stoi(msg);
- const std::string new_msg = std::to_string(num + 1);
- sent = client->send(new_msg.data(), new_msg.size());
- }
- }
-}
-
-void server(SocketPoll<ClientSocket>& poller)
+void server(SocketPoll<SimpleResponseClient>& poller)
{
// Start server.
auto server = std::make_shared<ServerSocket>();
@@ -250,7 +245,7 @@ void server(SocketPoll<ClientSocket>& poller)
{
if (server->pollRead(30000))
{
- std::shared_ptr<ClientSocket> clientSocket = server->accept();
+ std::shared_ptr<SimpleResponseClient> clientSocket = server->accept<SimpleResponseClient>();
if (!clientSocket)
{
const std::string msg = "Failed to accept. (errno: ";
@@ -264,44 +259,21 @@ void server(SocketPoll<ClientSocket>& poller)
}
/// Poll client sockets and do IO.
-void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
+void pollAndComm(SocketPoll<SimpleResponseClient>& poller, std::atomic<bool>& stop)
{
while (!stop)
{
- poller.poll(5000, [](const std::shared_ptr<ClientSocket>& socket, const int events)
+ poller.poll(5000, [](const std::shared_ptr<SimpleResponseClient>& socket, const int events)
{
if (events & POLLIN)
- {
- char buf[1024];
- const int recv = socket->recv(buf, sizeof(buf));
- if (recv <= 0)
- {
- perror("recv");
- return false;
- }
+ socket->readIncomingData();
- if (events & POLLOUT)
- {
- const std::string msg = std::string(buf, recv);
- const int num = stoi(msg);
- if ((num % (1<<16)) == 1)
- {
- std::cout << "Client #" << socket->getFD() << ": " << msg << std::endl;
- }
- const std::string new_msg = std::to_string(num + 1);
- const int sent = socket->send(new_msg.data(), new_msg.size());
- if (sent != static_cast<int>(new_msg.size()))
- {
- perror("send");
- return false;
- }
- }
- else
- {
- // Normally we'd buffer the response, but for now...
- std::cerr << "Client #" << socket->getFD()
- << ": ERROR - socket not ready for write." << std::endl;
- }
+ if (events & POLLOUT)
+ socket->writeOutgoingData();
+
+ if (events & (POLLHUP | POLLERR | POLLNVAL))
+ {
+ // FIXME - close and remove the socket ...
}
return true;
@@ -309,17 +281,10 @@ void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
}
}
-int main(int argc, const char**)
+int main(int, const char**)
{
- if (argc > 1)
- {
- // We are now the client application.
- client(0);
- return 0;
- }
-
// Used to poll client sockets.
- SocketPoll<ClientSocket> poller;
+ SocketPoll<SimpleResponseClient> poller;
// Start the client polling thread.
Thread threadPoll([&poller](std::atomic<bool>& stop)
diff --git a/net/socket.hpp b/net/socket.hpp
index 83202b4..55ca6b7 100644
--- a/net/socket.hpp
+++ b/net/socket.hpp
@@ -223,25 +223,51 @@ public:
// Now check if we connected, not, or not yet.
return (getError() == 0 || errno == EINPROGRESS);
}
+ protected:
+ std::vector< unsigned char > _inBuffer;
+ std::vector< unsigned char > _outBuffer;
+ public:
+ void readIncomingData()
+ {
+ ssize_t len;
+ unsigned char buf[4096];
+ do {
+ len = ::read(getFD(), buf, sizeof(buf));
+ } while (len < 0 && errno == EINTR);
+ if (len > 0)
+ {
+ assert (len < ssize_t(sizeof(buf)));
+ _inBuffer.insert(_inBuffer.end(), &buf[0], &buf[len]);
+ handleIncomingMessage();
+ }
+ // else poll will handle errors.
+ }
- /// Send data to our peer.
- /// Returns the number of bytes sent, -1 on error.
- int send(const void* buf, const size_t len)
+ void writeOutgoingData()
{
- // Don't SIGPIPE when the other end closes.
- const int rc = ::send(getFD(), buf, len, MSG_NOSIGNAL);
- return rc;
+ assert (_outBuffer.size() > 0);
+ ssize_t len;
+ do {
+ len = ::write(getFD(), &_outBuffer[0], _outBuffer.size());
+ } while (len < 0 && errno == EINTR);
+ if (len > 0)
+ {
+ _outBuffer.erase(_outBuffer.begin(),
+ _outBuffer.begin() + len);
+ }
+ // else poll will handle errors
}
- /// Receive data from our peer.
- /// Returns the number of bytes received, -1 on error,
- /// and 0 when the peer has performed an orderly shutdown.
- int recv(void* buf, const size_t len)
+ int getPollEvents()
{
- const int rc = ::recv(getFD(), buf, len, 0);
- return rc;
+ int pollFor = POLLIN | POLLPRI;
+ if (_outBuffer.size() > 0)
+ pollFor |= POLLOUT;
+ return pollFor;
}
+ virtual void handleIncomingMessage() = 0;
+
protected:
ClientSocket(const int fd) :
Socket(fd)
@@ -284,12 +310,13 @@ public:
/// Accepts an incoming connection (Servers only).
/// Does not retry on error.
/// Returns a valid Socket shared_ptr on success only.
- std::shared_ptr<ClientSocket> accept()
+ template <typename T>
+ std::shared_ptr<T> accept()
{
// Accept a connection (if any) and set it to non-blocking.
// We don't care about the client's address, so ignored.
const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK);
- return std::shared_ptr<ClientSocket>(rc != -1 ? new ClientSocket(rc) : nullptr);
+ return std::shared_ptr<T>(rc != -1 ? new T(rc) : nullptr);
}
};
More information about the Libreoffice-commits
mailing list