[Libreoffice-commits] online.git: Branch 'private/Ashod/nonblocking' - net/loolnb.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Tue Feb 14 03:29:32 UTC 2017
net/loolnb.cpp | 210 +++++++++++++++++++++++++++++++--------------------------
1 file changed, 117 insertions(+), 93 deletions(-)
New commits:
commit b9f4bed8ea75138ed04a66ffc6ccfd76396cc832
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Mon Feb 13 21:49:59 2017 -0500
nb: separate Socket into ClientSocket and ServerSocket
Change-Id: I1aafd6192b955e51b8f1e74c1aad5fc3603f71d6
Reviewed-on: https://gerrit.libreoffice.org/34237
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/net/loolnb.cpp b/net/loolnb.cpp
index 07c5c82..0ca4faf 100644
--- a/net/loolnb.cpp
+++ b/net/loolnb.cpp
@@ -127,87 +127,6 @@ public:
return rc;
}
- /// Connect to a server address.
- /// Does not retry on error.
- /// timeoutMs can be 0 to avoid waiting, or -1 to wait forever.
- /// Returns true on success only.
- /// Note: when succceeds, caller must check for
- /// EINPROGRESS and poll for write, then getError(),
- /// only when the latter returns 0 we are connected.
- bool connect(const SocketAddress& address, const int timeoutMs = 0)
- {
- const int rc = ::connect(_fd, address.addr(), address.length());
- if (rc == 0)
- {
- return true;
- }
-
- if (errno != EINPROGRESS)
- {
- return false;
- }
-
- // Wait for writable, then check again.
- pollWrite(timeoutMs);
-
- // Now check if we connected, not, or not yet.
- return (getError() == 0 || errno == EINPROGRESS);
- }
-
- /// Binds to a local address (Servers only).
- /// Does not retry on error.
- /// Returns true on success only.
- bool bind(const SocketAddress& address)
- {
- // Enable address reuse to avoid stalling after
- // recycling, when previous socket is TIME_WAIT.
- //TODO: Might be worth refactoring out.
- const int reuseAddress = 1;
- constexpr unsigned int len = sizeof(reuseAddress);
- ::setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
-
- const int rc = ::bind(_fd, address.addr(), address.length());
- return (rc == 0);
- }
-
- /// Listen to incoming connections (Servers only).
- /// Does not retry on error.
- /// Returns true on success only.
- bool listen(const int backlog = 64)
- {
- const int rc = ::listen(_fd, backlog);
- return (rc == 0);
- }
-
- /// Accepts an incoming connection (Servers only).
- /// Does not retry on error.
- /// Returns a valid Socket shared_ptr on success only.
- std::shared_ptr<Socket> accept()
- {
- // Accept a connection (if any) and set it to non-blocking.
- // We don't care about the client's address, so ignored.
- const int rc = ::accept4(_fd, nullptr, nullptr, SOCK_NONBLOCK);
- return std::shared_ptr<Socket>(rc != -1 ? new Socket(rc) : nullptr);
- }
-
- /// Send data to our peer.
- /// Returns the number of bytes sent, -1 on error.
- int send(const void* buf, const size_t len)
- {
- // Don't SIGPIPE when the other end closes.
- const int rc = ::send(_fd, buf, len, MSG_NOSIGNAL);
- return rc;
- }
-
- /// Receive data from our peer.
- /// Returns the number of bytes received, -1 on error,
- /// and 0 when the peer has performed an orderly shutdown.
- int recv(void* buf, const size_t len)
- {
- const int rc = ::recv(_fd, buf, len, 0);
- return rc;
- }
-
/// Poll the socket for either read, write, or both.
/// Returns -1 on failure/error (query socket error), 0 for timeout,
/// otherwise, depending on events, the respective bits set.
@@ -219,7 +138,7 @@ public:
pollfd poll;
memset(&poll, 0, sizeof(poll));
- poll.fd = _fd;
+ poll.fd = fd();
poll.events |= events;
int rc;
@@ -277,7 +196,7 @@ public:
return (rc > 0 && (rc & POLLOUT));
}
-private:
+protected:
/// Construct based on an existing socket fd.
/// Used by accept() only.
@@ -290,6 +209,111 @@ private:
const int _fd;
};
+/// A non-blocking, client socket.
+class ClientSocket : public Socket
+{
+public:
+ ClientSocket() :
+ Socket()
+ {
+ }
+
+ /// Connect to a server address.
+ /// Does not retry on error.
+ /// timeoutMs can be 0 to avoid waiting, or -1 to wait forever.
+ /// Returns true on success only.
+ /// Note: when succceeds, caller must check for
+ /// EINPROGRESS and poll for write, then getError(),
+ /// only when the latter returns 0 we are connected.
+ bool connect(const SocketAddress& address, const int timeoutMs = 0)
+ {
+ const int rc = ::connect(fd(), address.addr(), address.length());
+ if (rc == 0)
+ {
+ return true;
+ }
+
+ if (errno != EINPROGRESS)
+ {
+ return false;
+ }
+
+ // Wait for writable, then check again.
+ pollWrite(timeoutMs);
+
+ // Now check if we connected, not, or not yet.
+ return (getError() == 0 || errno == EINPROGRESS);
+ }
+
+ /// Send data to our peer.
+ /// Returns the number of bytes sent, -1 on error.
+ int send(const void* buf, const size_t len)
+ {
+ // Don't SIGPIPE when the other end closes.
+ const int rc = ::send(fd(), buf, len, MSG_NOSIGNAL);
+ return rc;
+ }
+
+ /// Receive data from our peer.
+ /// Returns the number of bytes received, -1 on error,
+ /// and 0 when the peer has performed an orderly shutdown.
+ int recv(void* buf, const size_t len)
+ {
+ const int rc = ::recv(fd(), buf, len, 0);
+ return rc;
+ }
+
+protected:
+ ClientSocket(const int fd) :
+ Socket(fd)
+ {
+ }
+
+ friend class ServerSocket;
+};
+
+/// A non-blocking, streaming socket.
+class ServerSocket : public Socket
+{
+public:
+
+ /// Binds to a local address (Servers only).
+ /// Does not retry on error.
+ /// Returns true on success only.
+ bool bind(const SocketAddress& address)
+ {
+ // Enable address reuse to avoid stalling after
+ // recycling, when previous socket is TIME_WAIT.
+ //TODO: Might be worth refactoring out.
+ const int reuseAddress = 1;
+ constexpr unsigned int len = sizeof(reuseAddress);
+ ::setsockopt(fd(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len);
+
+ const int rc = ::bind(fd(), address.addr(), address.length());
+ return (rc == 0);
+ }
+
+ /// Listen to incoming connections (Servers only).
+ /// Does not retry on error.
+ /// Returns true on success only.
+ bool listen(const int backlog = 64)
+ {
+ const int rc = ::listen(fd(), backlog);
+ return (rc == 0);
+ }
+
+ /// Accepts an incoming connection (Servers only).
+ /// Does not retry on error.
+ /// Returns a valid Socket shared_ptr on success only.
+ std::shared_ptr<ClientSocket> accept()
+ {
+ // Accept a connection (if any) and set it to non-blocking.
+ // We don't care about the client's address, so ignored.
+ const int rc = ::accept4(fd(), nullptr, nullptr, SOCK_NONBLOCK);
+ return std::shared_ptr<ClientSocket>(rc != -1 ? new ClientSocket(rc) : nullptr);
+ }
+};
+
/// Handles non-blocking socket event polling.
/// Only polls on N-Sockets and invokes callback and
/// doesn't manage buffers or client data.
@@ -373,7 +397,7 @@ public:
/// Insert a new socket to be polled.
/// Sockets are removed only when the handler return false.
- void insertNewSocket(const std::shared_ptr<Socket>& newSocket)
+ void insertNewSocket(const std::shared_ptr<ClientSocket>& newSocket)
{
std::lock_guard<std::mutex> lock(_mutex);
@@ -422,10 +446,10 @@ private:
/// main-loop wakeup pipe
int _wakeup[2];
/// The sockets we're controlling
- std::vector<std::shared_ptr<Socket>> _pollSockets;
+ std::vector<std::shared_ptr<ClientSocket>> _pollSockets;
/// Protects _newSockets
std::mutex _mutex;
- std::vector<std::shared_ptr<Socket>> _newSockets;
+ std::vector<std::shared_ptr<ClientSocket>> _newSockets;
/// The fds to poll.
std::vector<pollfd> _pollFds;
};
@@ -468,7 +492,7 @@ SocketAddress addr("127.0.0.1", PortNumber);
void client(const int timeoutMs)
{
- const auto client = std::make_shared<Socket>();
+ const auto client = std::make_shared<ClientSocket>();
if (!client->connect(addr, timeoutMs) && errno != EINPROGRESS)
{
const std::string msg = "Failed to call connect. (errno: ";
@@ -498,10 +522,10 @@ void client(const int timeoutMs)
}
}
-void server(SocketPoll<Socket>& poller)
+void server(SocketPoll<ClientSocket>& poller)
{
// Start server.
- auto server = std::make_shared<Socket>();
+ auto server = std::make_shared<ServerSocket>();
if (!server->bind(addr))
{
const std::string msg = "Failed to bind. (errno: ";
@@ -519,7 +543,7 @@ void server(SocketPoll<Socket>& poller)
{
if (server->pollRead(30000))
{
- std::shared_ptr<Socket> clientSocket = server->accept();
+ std::shared_ptr<ClientSocket> clientSocket = server->accept();
if (!clientSocket)
{
const std::string msg = "Failed to accept. (errno: ";
@@ -533,11 +557,11 @@ void server(SocketPoll<Socket>& poller)
}
/// Poll client sockets and do IO.
-void pollAndComm(SocketPoll<Socket>& poller, std::atomic<bool>& stop)
+void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
{
while (!stop)
{
- poller.poll(5000, [](const std::shared_ptr<Socket>& socket, const int events)
+ poller.poll(5000, [](const std::shared_ptr<ClientSocket>& socket, const int events)
{
if (events & POLLIN)
{
@@ -588,7 +612,7 @@ int main(int argc, const char**)
}
// Used to poll client sockets.
- SocketPoll<Socket> poller;
+ SocketPoll<ClientSocket> poller;
// Start the client polling thread.
Thread threadPoll([&poller](std::atomic<bool>& stop)
More information about the Libreoffice-commits
mailing list