[Libreoffice-commits] online.git: loolwsd/IoUtil.cpp loolwsd/IoUtil.hpp loolwsd/LOOLWSD.cpp loolwsd/Makefile.am
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Wed Mar 30 01:45:38 UTC 2016
loolwsd/IoUtil.cpp | 165 +++++++++++++++++++++++++++++++++++++++++++++++++++-
loolwsd/IoUtil.hpp | 9 ++
loolwsd/LOOLWSD.cpp | 158 +++----------------------------------------------
loolwsd/Makefile.am | 2
4 files changed, 186 insertions(+), 148 deletions(-)
New commits:
commit b25fe9d88a86bc247f1f3aedbbf7a51c5193d259
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Sun Mar 27 16:06:22 2016 -0400
loolwsd: moved SocketProcessor to IoUtil and generalized more
Change-Id: I527e57d2430e21249cf8cd4867f22fdbbd092b09
Reviewed-on: https://gerrit.libreoffice.org/23637
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/IoUtil.cpp b/loolwsd/IoUtil.cpp
index 7284eeb..cf46e7e 100644
--- a/loolwsd/IoUtil.cpp
+++ b/loolwsd/IoUtil.cpp
@@ -18,16 +18,179 @@
#include <sstream>
#include <string>
-#include <Poco/Exception.h>
+#include <Poco/StringTokenizer.h>
+#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/WebSocket.h>
+#include <Poco/Net/NetException.h>
#include <Poco/Thread.h>
#include "Common.hpp"
+#include "LOOLProtocol.hpp"
#include "IoUtil.hpp"
#include "Util.hpp"
namespace IoUtil
{
+using Poco::Net::WebSocket;
+using Poco::Net::WebSocketException;
+
+// Synchronously process WebSocket requests and dispatch to handler.
+// Handler returns false to end.
+void SocketProcessor(std::shared_ptr<WebSocket> ws,
+ Poco::Net::HTTPServerResponse& response,
+ std::function<bool(const std::vector<char>&)> handler,
+ std::function<bool()> stopPredicate,
+ std::string name,
+ const size_t pollTimeoutMs)
+{
+ if (!name.empty())
+ {
+ name = "[" + name + "] ";
+ }
+
+ Log::info(name + "Starting Socket Processor.");
+
+ // Timeout given is in microseconds.
+ const Poco::Timespan waitTime(pollTimeoutMs * 1000);
+ try
+ {
+ ws->setReceiveTimeout(0);
+
+ int flags = 0;
+ int n = 0;
+ bool stop = false;
+ std::vector<char> payload(READ_BUFFER_SIZE * 100);
+
+ for (;;)
+ {
+ stop = stopPredicate();
+ if (stop)
+ {
+ Log::info(name + "Termination flagged. Finishing.");
+ break;
+ }
+
+ if (!ws->poll(waitTime, Poco::Net::Socket::SELECT_READ))
+ {
+ // Wait some more.
+ continue;
+ }
+
+ payload.resize(payload.capacity());
+ n = ws->receiveFrame(payload.data(), payload.capacity(), flags);
+ if (n >= 0)
+ {
+ payload.resize(n);
+ }
+
+ if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
+ {
+ // Echo back the ping payload as pong.
+ // Technically, we should send back a PONG control frame.
+ // However Firefox (probably) or Node.js (possibly) doesn't
+ // like that and closes the socket when we do.
+ // Echoing the payload as a normal frame works with Firefox.
+ ws->sendFrame(payload.data(), n /*, WebSocket::FRAME_OP_PONG*/);
+ continue;
+ }
+ else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
+ {
+ // In case we do send pings in the future.
+ continue;
+ }
+ else if (n <= 0 || ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE))
+ {
+ Log::warn(name + "Connection closed.");
+ break;
+ }
+
+ assert(n > 0);
+
+ const std::string firstLine = LOOLProtocol::getFirstLine(payload);
+ if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) != WebSocket::FrameFlags::FRAME_FLAG_FIN)
+ {
+ // One WS message split into multiple frames.
+ while (true)
+ {
+ char buffer[READ_BUFFER_SIZE * 10];
+ n = ws->receiveFrame(buffer, sizeof(buffer), flags);
+ if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
+ {
+ break;
+ }
+
+ payload.insert(payload.end(), buffer, buffer + n);
+ if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) == WebSocket::FrameFlags::FRAME_FLAG_FIN)
+ {
+ // No more frames.
+ break;
+ }
+ }
+ }
+ else
+ {
+ int size = 0;
+ Poco::StringTokenizer tokens(firstLine, " ", Poco::StringTokenizer::TOK_IGNORE_EMPTY | Poco::StringTokenizer::TOK_TRIM);
+ if (tokens.count() == 2 &&
+ tokens[0] == "nextmessage:" && LOOLProtocol::getTokenInteger(tokens[1], "size", size) && size > 0)
+ {
+ // Check if it is a "nextmessage:" and in that case read the large
+ // follow-up message separately, and handle that only.
+ payload.resize(size);
+
+ n = ws->receiveFrame(payload.data(), size, flags);
+ }
+ }
+
+ if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
+ {
+ Log::warn(name + "Connection closed.");
+ break;
+ }
+
+ if (firstLine == "eof")
+ {
+ Log::info(name + "Received EOF. Finishing.");
+ break;
+ }
+
+ // Call the handler.
+ if (!handler(payload))
+ {
+ Log::info(name + "Socket handler flagged to finish.");
+ break;
+ }
+ }
+
+ Log::debug() << name << "Finishing SocketProcessor. TerminationFlag: " << stop
+ << ", payload size: " << payload.size()
+ << ", flags: " << std::hex << flags << Log::end;
+ if (!payload.empty())
+ {
+ Log::warn(name + "Last message will not be processed: [" +
+ LOOLProtocol::getAbbreviatedMessage(payload.data(), payload.size()) + "].");
+ }
+ }
+ catch (const WebSocketException& exc)
+ {
+ Log::error("SocketProcessor: WebSocketException: " + exc.message());
+ switch (exc.code())
+ {
+ case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
+ response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
+ // fallthrough
+ case WebSocket::WS_ERR_NO_HANDSHAKE:
+ case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
+ case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
+ response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_BAD_REQUEST);
+ response.setContentLength(0);
+ response.send();
+ break;
+ }
+ }
+
+ Log::info(name + "Finished Socket Processor.");
+}
void shutdownWebSocket(std::shared_ptr<Poco::Net::WebSocket> ws)
{
diff --git a/loolwsd/IoUtil.hpp b/loolwsd/IoUtil.hpp
index da3db34..1647e29 100644
--- a/loolwsd/IoUtil.hpp
+++ b/loolwsd/IoUtil.hpp
@@ -21,6 +21,15 @@
namespace IoUtil
{
+ /// Synchronously process WebSocket requests and dispatch to handler.
+ //. Handler returns false to end.
+ void SocketProcessor(std::shared_ptr<Poco::Net::WebSocket> ws,
+ Poco::Net::HTTPServerResponse& response,
+ std::function<bool(const std::vector<char>&)> handler,
+ std::function<bool()> stopPredicate,
+ std::string name = std::string(),
+ const size_t pollTimeoutMs = POLL_TIMEOUT_MS);
+
/// Call WebSocket::shutdown() ignoring Poco::IOException.
void shutdownWebSocket(std::shared_ptr<Poco::Net::WebSocket> ws);
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 5003a65..650b1e1 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -158,7 +158,6 @@ using Poco::TemporaryFile;
using Poco::Thread;
using Poco::ThreadLocal;
using Poco::ThreadPool;
-using Poco::Timespan;
using Poco::URI;
using Poco::Util::Application;
using Poco::Util::HelpFormatter;
@@ -218,147 +217,6 @@ public:
}
};
-// Synchronously process WebSocket requests and dispatch to handler.
-// Handler returns false to end.
-void SocketProcessor(std::shared_ptr<WebSocket> ws,
- HTTPServerResponse& response,
- std::function<bool(const std::vector<char>&)> handler)
-{
- Log::info("Starting Socket Processor.");
-
- const Timespan waitTime(POLL_TIMEOUT_MS * 1000);
- try
- {
- ws->setReceiveTimeout(0);
-
- int flags = 0;
- int n = 0;
- std::vector<char> payload(READ_BUFFER_SIZE * 100);
-
- while (!TerminationFlag &&
- (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
- {
- if (!ws->poll(waitTime, Socket::SELECT_READ))
- {
- // Wait some more.
- continue;
- }
-
- payload.resize(payload.capacity());
- n = ws->receiveFrame(payload.data(), payload.capacity(), flags);
- if (n >= 0)
- {
- payload.resize(n);
- }
-
- if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
- {
- // Echo back the ping payload as pong.
- // Technically, we should send back a PONG control frame.
- // However Firefox (probably) or Node.js (possibly) doesn't
- // like that and closes the socket when we do.
- // Echoing the payload as a normal frame works with Firefox.
- ws->sendFrame(payload.data(), n /*, WebSocket::FRAME_OP_PONG*/);
- }
- else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
- {
- // In case we do send pings in the future.
- }
- else if (n <= 0 || ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE))
- {
- // Connection closed.
- Log::warn() << "Received " << n
- << " bytes. Connection closed. Flags: "
- << std::hex << flags << Log::end;
- break;
- }
-
- assert(n > 0);
-
- const std::string firstLine = LOOLProtocol::getFirstLine(payload);
- if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) != WebSocket::FrameFlags::FRAME_FLAG_FIN)
- {
- // One WS message split into multiple frames.
- while (true)
- {
- char buffer[READ_BUFFER_SIZE * 10];
- n = ws->receiveFrame(buffer, sizeof(buffer), flags);
- if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
- {
- break;
- }
-
- payload.insert(payload.end(), buffer, buffer + n);
- if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) == WebSocket::FrameFlags::FRAME_FLAG_FIN)
- {
- // No more frames.
- break;
- }
- }
- }
- else
- {
- int size = 0;
- StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
- if (tokens.count() == 2 &&
- tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0)
- {
- // Check if it is a "nextmessage:" and in that case read the large
- // follow-up message separately, and handle that only.
- payload.resize(size);
-
- n = ws->receiveFrame(payload.data(), size, flags);
- }
- }
-
- if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
- {
- break;
- }
-
- if (firstLine == "eof")
- {
- Log::info("Received EOF. Finishing.");
- break;
- }
-
- // Call the handler.
- if (!handler(payload))
- {
- Log::info("Socket handler flagged for finishing.");
- }
- }
-
- Log::debug() << "Finishing SocketProcessor. TerminationFlag: " << TerminationFlag
- << ", payload size: " << payload.size()
- << ", flags: " << std::hex << flags << Log::end;
- if (!payload.empty())
- {
- Log::warn("Last message will not be processed: [" + getAbbreviatedMessage(payload.data(), payload.size()) + "].");
- }
- }
- catch (const WebSocketException& exc)
- {
- Log::error("SocketProcessor: WebSocketException: " + exc.message());
- switch (exc.code())
- {
- case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
- response.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
- // fallthrough
- case WebSocket::WS_ERR_NO_HANDSHAKE:
- case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
- case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
- response.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
- response.setContentLength(0);
- response.send();
- break;
- }
- }
-
- Log::info("Finished Socket Processor.");
-}
-
-
/// Handle a public connection from a client.
class ClientRequestHandler: public HTTPRequestHandler
{
@@ -615,7 +473,8 @@ private:
queueHandlerThread.start(handler);
bool normalShutdown = false;
- SocketProcessor(ws, response, [&session, &queue, &normalShutdown](const std::vector<char>& payload)
+ IoUtil::SocketProcessor(ws, response,
+ [&session, &queue, &normalShutdown](const std::vector<char>& payload)
{
time(&session->_lastMessageTime);
const auto token = LOOLProtocol::getFirstToken(payload);
@@ -629,7 +488,10 @@ private:
}
return true;
- });
+ },
+ []() { return TerminationFlag; },
+ "Client_ws_" + id
+ );
if (docBroker->getSessionsCount() == 1 && !normalShutdown)
{
@@ -826,10 +688,14 @@ public:
lock.unlock();
MasterProcessSession::AvailableChildSessionCV.notify_one();
- SocketProcessor(ws, response, [&session](const std::vector<char>& payload)
+ IoUtil::SocketProcessor(ws, response,
+ [&session](const std::vector<char>& payload)
{
return session->handleInput(payload.data(), payload.size());
- });
+ },
+ []() { return TerminationFlag; },
+ "Child_ws_" + sessionId
+ );
}
catch (const Exception& exc)
{
diff --git a/loolwsd/Makefile.am b/loolwsd/Makefile.am
index fe54269..458ea85 100644
--- a/loolwsd/Makefile.am
+++ b/loolwsd/Makefile.am
@@ -20,7 +20,7 @@ loadtest_SOURCES = LoadTest.cpp Util.cpp LOOLProtocol.cpp
connect_SOURCES = Connect.cpp Util.cpp LOOLProtocol.cpp
-lokitclient_SOURCES = LOKitClient.cpp IoUtil.cpp Util.cpp
+lokitclient_SOURCES = LOKitClient.cpp LOOLProtocol.cpp IoUtil.cpp Util.cpp
broker_shared_sources = ChildProcessSession.cpp $(shared_sources)
More information about the Libreoffice-commits
mailing list