[Libreoffice-commits] online.git: loolwsd/LOOLWSD.cpp
Ashod Nakashian
ashod.nakashian at collabora.co.uk
Wed Jan 6 06:23:17 PST 2016
loolwsd/LOOLWSD.cpp | 61 +++++++++++++++++++++++++++++-----------------------
1 file changed, 35 insertions(+), 26 deletions(-)
New commits:
commit ea1415de75156f8989e8bdcc6cfe7c53e08b3fbc
Author: Ashod Nakashian <ashod.nakashian at collabora.co.uk>
Date: Tue Jan 5 22:32:58 2016 -0500
loolwsd: improved HTTP Request Handler
Change-Id: I1ad4359732c7b5ee9fc8743ebc60e1c94c304dcc
Reviewed-on: https://gerrit.libreoffice.org/21158
Reviewed-by: Ashod Nakashian <ashnakash at gmail.com>
Tested-by: Ashod Nakashian <ashnakash at gmail.com>
diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp
index 5f1ad43..259b97e 100644
--- a/loolwsd/LOOLWSD.cpp
+++ b/loolwsd/LOOLWSD.cpp
@@ -244,18 +244,17 @@ public:
// Handler returns false to end.
void SocketProcessor(std::shared_ptr<WebSocket> ws,
HTTPServerResponse& response,
- std::function<bool(const char* data, const int size)> handler)
+ std::function<bool(const char* data, const int size, const bool singleLine)> handler)
{
+ Log::info("Starting Socket Processor.");
+
const Poco::Timespan waitTime(POLL_TIMEOUT);
try
{
- // Loop, receiving WebSocket messages either from the client, or from the child
- // process (to be forwarded to the client).
- int flags;
- int n;
+ int flags = 0;
+ int n = 0;
bool pollTimeout = true;
ws->setReceiveTimeout(0);
-
do
{
char buffer[200000]; //FIXME: Dynamic?
@@ -270,24 +269,27 @@ void SocketProcessor(std::shared_ptr<WebSocket> ws,
}
else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
{
- n = 1;
+ continue;
}
else if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
{
const std::string firstLine = getFirstLine(buffer, n);
if (firstLine == "eof")
+ {
+ Log::info("Recieved EOF. Finishing.");
break;
-
- StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+ }
if (firstLine.size() == static_cast<std::string::size_type>(n))
{
- handler(firstLine.c_str(), firstLine.size());
+ handler(firstLine.c_str(), firstLine.size(), true);
}
else
{
// Check if it is a "nextmessage:" and in that case read the large
// follow-up message separately, and handle that only.
+ StringTokenizer tokens(firstLine, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
+
int size;
if (tokens.count() == 2 &&
tokens[0] == "nextmessage:" && getTokenInteger(tokens[1], "size", size) && size > 0)
@@ -297,14 +299,20 @@ void SocketProcessor(std::shared_ptr<WebSocket> ws,
n = ws->receiveFrame(largeBuffer, size, flags);
if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)
{
- if (!handler(largeBuffer, n))
- n = 0;
+ if (!handler(largeBuffer, n, false))
+ {
+ Log::info("Socket handler flagged for finishing.");
+ break;
+ }
}
}
else
{
- if (!handler(buffer, n))
- n = 0;
+ if (!handler(buffer, n, false))
+ {
+ Log::info("Socket handler flagged for finishing.");
+ break;
+ }
}
}
}
@@ -312,6 +320,8 @@ void SocketProcessor(std::shared_ptr<WebSocket> ws,
}
while (!TerminationFlag &&
(!pollTimeout || (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) != WebSocket::FRAME_OP_CLOSE)));
+ Log::debug() << "Finishing SocketProcessor. TerminationFlag: " << TerminationFlag
+ << ", pollTimeout: " << pollTimeout << ", payload size: " << n << Log::end;
}
catch (const WebSocketException& exc)
{
@@ -330,6 +340,8 @@ void SocketProcessor(std::shared_ptr<WebSocket> ws,
break;
}
}
+
+ Log::info("Finished Socket Processor.");
}
@@ -479,14 +491,15 @@ public:
handler.setSession(session);
queueHandlerThread.start(handler);
- SocketProcessor(ws, response, [&session, &queue](const char* data, const int size)
+ SocketProcessor(ws, response, [&session, &queue](const char* data, const int size, const bool singleLine)
{
- const std::string firstLine = getFirstLine(data, size);
- if (firstLine == "eof")
- return false;
-
- if (firstLine.size() == static_cast<std::string::size_type>(size))
+ // FIXME: There is a race here when a request A gets in the queue and
+ // is processed _after_ a later request B, because B gets processed
+ // synchronously and A is waiting in the queue thread.
+ // Fix is to push everything into the queue.
+ if (singleLine)
{
+ const std::string firstLine = getFirstLine(data, size);
queue.put(firstLine);
return true;
}
@@ -533,15 +546,11 @@ public:
{
auto ws = std::make_shared<WebSocket>(request, response);
- std::string id;
+ const std::string id;
auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToPrisoner, ws);
- SocketProcessor(ws, response, [&session](const char* data, const int size)
+ SocketProcessor(ws, response, [&session](const char* data, const int size, bool)
{
- const std::string firstLine = getFirstLine(data, size);
- if (firstLine == "eof")
- return false;
-
return session->handleInput(data, size);
});
}
More information about the Libreoffice-commits
mailing list