[Spice-devel] [PATCH 1/3] vdagent: add message_queue for messages written to pipe
Alon Levy
alevy at redhat.com
Thu Sep 13 06:52:26 PDT 2012
> This is only part of the message corruption solution.
> The other part is fixing virtio-serial / spice-qemu-char throttling
> code.
>
> -replace write_[lock/unlock/completion] calls with
> [new/enqueue]_message
> -remove clipboard specific _out_msg_* class members
> -remove ugly loop - while (a->_out_msg && a->write_clipboard());
> -add _message_mutex for message queue
> -fix pending_write race using _write_mutex
> -TODO: enqueue large message without dividing it to chunks in advance
>
ACK.
> rhbz #846427
> ---
> vdagent/vdagent.cpp | 186
> +++++++++++++++++++++++++--------------------------
> 1 files changed, 91 insertions(+), 95 deletions(-)
>
> diff --git a/vdagent/vdagent.cpp b/vdagent/vdagent.cpp
> index b8bad44..3ffafe3 100644
> --- a/vdagent/vdagent.cpp
> +++ b/vdagent/vdagent.cpp
> @@ -94,10 +94,10 @@ private:
> enum { CONTROL_STOP, CONTROL_DESKTOP_SWITCH };
> void set_control_event(int control_command);
> void handle_control_event();
> - uint8_t* write_lock(DWORD bytes = 0);
> - void write_unlock(DWORD bytes = 0);
> + VDPipeMessage* new_message(DWORD bytes = 0);
> + void enqueue_message(VDPipeMessage* msg);
> bool write_message(uint32_t type, uint32_t size, void* data);
> - bool write_clipboard();
> + bool write_clipboard(VDAgentMessage* msg, uint32_t size);
> bool connect_pipe();
> bool send_input();
> void set_display_depth(uint32_t depth);
> @@ -119,9 +119,6 @@ private:
> HANDLE _clipboard_event;
> VDAgentMessage* _in_msg;
> uint32_t _in_msg_pos;
> - VDAgentMessage* _out_msg;
> - uint32_t _out_msg_pos;
> - uint32_t _out_msg_size;
> bool _pending_input;
> bool _pending_write;
> bool _running;
> @@ -131,7 +128,9 @@ private:
> VDPipeState _pipe_state;
> mutex_t _write_mutex;
> mutex_t _control_mutex;
> + mutex_t _message_mutex;
> std::queue<int> _control_queue;
> + std::queue<VDPipeMessage*> _message_queue;
>
> bool _logon_desktop;
> bool _display_setting_initialized;
> @@ -167,9 +166,6 @@ VDAgent::VDAgent()
> , _clipboard_event (NULL)
> , _in_msg (NULL)
> , _in_msg_pos (0)
> - , _out_msg (NULL)
> - , _out_msg_pos (0)
> - , _out_msg_size (0)
> , _pending_input (false)
> , _pending_write (false)
> , _running (false)
> @@ -193,6 +189,7 @@ VDAgent::VDAgent()
> ZeroMemory(&_pipe_state, sizeof(VDPipeState));
> MUTEX_INIT(_write_mutex);
> MUTEX_INIT(_control_mutex);
> + MUTEX_INIT(_message_mutex);
>
> _singleton = this;
> }
> @@ -538,7 +535,7 @@ bool
> VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config,
> uint32_t port
> }
>
> DWORD msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply);
> - reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
> + reply_pipe_msg = new_message(msg_size);
> if (!reply_pipe_msg) {
> return false;
> }
> @@ -553,10 +550,7 @@ bool
> VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config,
> uint32_t port
> reply = (VDAgentReply*)reply_msg->data;
> reply->type = VD_AGENT_MONITORS_CONFIG;
> reply->error = display_count ? VD_AGENT_SUCCESS :
> VD_AGENT_ERROR;
> - write_unlock(msg_size);
> - if (!_pending_write) {
> - write_completion(0, 0, &_pipe_state.write.overlap);
> - }
> + enqueue_message(reply_pipe_msg);
> return true;
> }
>
> @@ -669,7 +663,7 @@ bool VDAgent::send_announce_capabilities(bool
> request)
> uint32_t internal_msg_size = sizeof(VDAgentAnnounceCapabilities)
> + VD_AGENT_CAPS_BYTES;
>
> msg_size = VD_MESSAGE_HEADER_SIZE + internal_msg_size;
> - caps_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
> + caps_pipe_msg = new_message(msg_size);
> if (!caps_pipe_msg) {
> return false;
> }
> @@ -694,10 +688,7 @@ bool VDAgent::send_announce_capabilities(bool
> request)
> for (uint32_t i = 0 ; i < caps_size; ++i) {
> vd_printf("%X", caps->caps[i]);
> }
> - write_unlock(msg_size);
> - if (!_pending_write) {
> - write_completion(0, 0, &_pipe_state.write.overlap);
> - }
> + enqueue_message(caps_pipe_msg);
> return true;
> }
>
> @@ -750,11 +741,10 @@ bool
> VDAgent::handle_display_config(VDAgentDisplayConfig* display_config,
> uint32
> }
>
> msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply);
> - reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
> + reply_pipe_msg = new_message(msg_size);
> if (!reply_pipe_msg) {
> return false;
> }
> -
> reply_pipe_msg->type = VD_AGENT_COMMAND;
> reply_pipe_msg->opaque = port;
> reply_pipe_msg->size = sizeof(VDAgentMessage) +
> sizeof(VDAgentReply);
> @@ -766,10 +756,7 @@ bool
> VDAgent::handle_display_config(VDAgentDisplayConfig* display_config,
> uint32
> reply = (VDAgentReply*)reply_msg->data;
> reply->type = VD_AGENT_DISPLAY_CONFIG;
> reply->error = VD_AGENT_SUCCESS;
> - write_unlock(msg_size);
> - if (!_pending_write) {
> - write_completion(0, 0, &_pipe_state.write.overlap);
> - }
> + enqueue_message(reply_pipe_msg);
> return true;
> }
>
> @@ -778,16 +765,13 @@ bool VDAgent::handle_control(VDPipeMessage*
> msg)
> switch (msg->type) {
> case VD_AGENT_RESET: {
> vd_printf("Agent reset");
> - VDPipeMessage* ack =
> (VDPipeMessage*)write_lock(sizeof(VDPipeMessage));
> + VDPipeMessage* ack = new_message(sizeof(VDPipeMessage));
> if (!ack) {
> return false;
> }
> ack->type = VD_AGENT_RESET_ACK;
> ack->opaque = msg->opaque;
> - write_unlock(sizeof(VDPipeMessage));
> - if (!_pending_write) {
> - write_completion(0, 0, &_pipe_state.write.overlap);
> - }
> + enqueue_message(ack);
> break;
> }
> case VD_AGENT_SESSION_LOGON:
> @@ -816,30 +800,30 @@ bool VDAgent::handle_control(VDPipeMessage*
> msg)
>
> //FIXME: division to max size chunks should NOT be here, but in the
> service
> // here we should write the max possible size to the pipe
> -bool VDAgent::write_clipboard()
> +bool VDAgent::write_clipboard(VDAgentMessage* msg, uint32_t size)
> {
> - ASSERT(_out_msg);
> - DWORD n = MIN(sizeof(VDPipeMessage) + _out_msg_size -
> _out_msg_pos, VD_AGENT_MAX_DATA_SIZE);
> - VDPipeMessage* pipe_msg = (VDPipeMessage*)write_lock(n);
> - if (!pipe_msg) {
> - return false;
> - }
> - pipe_msg->type = VD_AGENT_COMMAND;
> - pipe_msg->opaque = VDP_CLIENT_PORT;
> - pipe_msg->size = n - sizeof(VDPipeMessage);
> - memcpy(pipe_msg->data, (char*)_out_msg + _out_msg_pos, n -
> sizeof(VDPipeMessage));
> - write_unlock(n);
> - if (!_pending_write) {
> - write_completion(0, 0, &_pipe_state.write.overlap);
> - }
> - _out_msg_pos += (n - sizeof(VDPipeMessage));
> - if (_out_msg_pos == _out_msg_size) {
> - delete[] (uint8_t *)_out_msg;
> - _out_msg = NULL;
> - _out_msg_size = 0;
> - _out_msg_pos = 0;
> - }
> - return true;
> + uint32_t pos = 0;
> + bool ret = true;
> +
> + ASSERT(msg && size);
> + //FIXME: do it smarter - no loop, no memcopy
> + MUTEX_LOCK(_message_mutex);
> + while (pos < size) {
> + DWORD n = MIN(sizeof(VDPipeMessage) + size - pos,
> VD_AGENT_MAX_DATA_SIZE);
> + VDPipeMessage* pipe_msg = new_message(n);
> + if (!pipe_msg) {
> + ret = false;
> + break;
> + }
> + pipe_msg->type = VD_AGENT_COMMAND;
> + pipe_msg->opaque = VDP_CLIENT_PORT;
> + pipe_msg->size = n - sizeof(VDPipeMessage);
> + memcpy(pipe_msg->data, (char*)msg + pos, n -
> sizeof(VDPipeMessage));
> + enqueue_message(pipe_msg);
> + pos += (n - sizeof(VDPipeMessage));
> + }
> + MUTEX_UNLOCK(_message_mutex);
> + return ret;
> }
>
> bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void*
> data = NULL)
> @@ -847,7 +831,7 @@ bool VDAgent::write_message(uint32_t type,
> uint32_t size = 0, void* data = NULL)
> VDPipeMessage* pipe_msg;
> VDAgentMessage* msg;
>
> - pipe_msg = (VDPipeMessage*)write_lock(VD_MESSAGE_HEADER_SIZE +
> size);
> + pipe_msg = new_message(VD_MESSAGE_HEADER_SIZE + size);
> if (!pipe_msg) {
> return false;
> }
> @@ -862,10 +846,7 @@ bool VDAgent::write_message(uint32_t type,
> uint32_t size = 0, void* data = NULL)
> if (size && data) {
> memcpy(msg->data, data, size);
> }
> - write_unlock(VD_MESSAGE_HEADER_SIZE + size);
> - if (!_pending_write) {
> - write_completion(0, 0, &_pipe_state.write.overlap);
> - }
> + enqueue_message(pipe_msg);
> return true;
> }
>
> @@ -993,6 +974,8 @@ bool
> VDAgent::handle_clipboard_grab(VDAgentClipboardGrab* clipboard_grab,
> uint32
> // VD_AGENT_CLIPBOARD_NONE and no data, so the client will know the
> request failed.
> bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest*
> clipboard_request)
> {
> + VDAgentMessage* msg;
> + uint32_t msg_size;
> UINT format;
> HANDLE clip_data;
> uint8_t* new_data = NULL;
> @@ -1008,10 +991,6 @@ bool
> VDAgent::handle_clipboard_request(VDAgentClipboardRequest*
> clipboard_reques
> vd_printf("Unsupported clipboard type %u",
> clipboard_request->type);
> return false;
> }
> - if (_out_msg) {
> - vd_printf("clipboard change is already pending");
> - return false;
> - }
> if (!IsClipboardFormatAvailable(format) ||
> !OpenClipboard(_hwnd)) {
> return false;
> }
> @@ -1047,14 +1026,13 @@ bool
> VDAgent::handle_clipboard_request(VDAgentClipboardRequest*
> clipboard_reques
> CloseClipboard();
> return false;
> }
> - _out_msg_pos = 0;
> - _out_msg_size = sizeof(VDAgentMessage) +
> sizeof(VDAgentClipboard) + new_size;
> - _out_msg = (VDAgentMessage*)new uint8_t[_out_msg_size];
> - _out_msg->protocol = VD_AGENT_PROTOCOL;
> - _out_msg->type = VD_AGENT_CLIPBOARD;
> - _out_msg->opaque = 0;
> - _out_msg->size = (uint32_t)(sizeof(VDAgentClipboard) +
> new_size);
> - VDAgentClipboard* clipboard = (VDAgentClipboard*)_out_msg->data;
> + msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) +
> new_size;
> + msg = (VDAgentMessage*)new uint8_t[msg_size];
> + msg->protocol = VD_AGENT_PROTOCOL;
> + msg->type = VD_AGENT_CLIPBOARD;
> + msg->opaque = 0;
> + msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size);
> + VDAgentClipboard* clipboard = (VDAgentClipboard*)msg->data;
> clipboard->type = clipboard_request->type;
>
> switch (clipboard_request->type) {
> @@ -1070,7 +1048,8 @@ bool
> VDAgent::handle_clipboard_request(VDAgentClipboardRequest*
> clipboard_reques
> break;
> }
> CloseClipboard();
> - write_clipboard();
> + write_clipboard(msg, msg_size);
> + delete[] (uint8_t *)msg;
> return true;
> }
>
> @@ -1281,8 +1260,8 @@ VOID CALLBACK VDAgent::write_completion(DWORD
> err, DWORD bytes, LPOVERLAPPED ove
> {
> VDAgent* a = _singleton;
> VDPipeState* ps = &a->_pipe_state;
> + DWORD size_left;
>
> - a->_pending_write = false;
> if (!a->_running) {
> return;
> }
> @@ -1291,40 +1270,57 @@ VOID CALLBACK VDAgent::write_completion(DWORD
> err, DWORD bytes, LPOVERLAPPED ove
> a->_running = false;
> return;
> }
> - if (!a->write_lock()) {
> - a->_running = false;
> - return;
> - }
> + MUTEX_LOCK(a->_write_mutex);
> ps->write.start += bytes;
> if (ps->write.start == ps->write.end) {
> ps->write.start = ps->write.end = 0;
> - //DEBUG
> - while (a->_out_msg && a->write_clipboard());
> - } else if (WriteFileEx(ps->pipe, ps->write.data +
> ps->write.start,
> - ps->write.end - ps->write.start, overlap,
> write_completion)) {
> - a->_pending_write = true;
> + }
> +
> + MUTEX_LOCK(a->_message_mutex);
> + size_left = sizeof(a->_pipe_state.write.data) -
> a->_pipe_state.write.end;
> + while (!a->_message_queue.empty()) {
> + VDPipeMessage* msg = a->_message_queue.front();
> + DWORD size = sizeof(VDPipeMessage) + msg->size;
> +
> + if (size > size_left) {
> + break;
> + }
> + a->_message_queue.pop();
> + memcpy(a->_pipe_state.write.data + a->_pipe_state.write.end,
> msg, size);
> + a->_pipe_state.write.end += size;
> + size_left -= size;
> + delete msg;
> + }
> + MUTEX_UNLOCK(a->_message_mutex);
> +
> + if (ps->write.start < ps->write.end) {
> + if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start,
> + ps->write.end - ps->write.start,
> overlap, write_completion)) {
> + a->_pending_write = true;
> + } else {
> + vd_printf("WriteFileEx() failed: %lu", GetLastError());
> + a->_running = false;
> + }
> } else {
> - vd_printf("WriteFileEx() failed: %lu", GetLastError());
> - a->_running = false;
> + a->_pending_write = false;
> }
> - a->write_unlock();
> + MUTEX_UNLOCK(a->_write_mutex);
> }
>
> -uint8_t* VDAgent::write_lock(DWORD bytes)
> +VDPipeMessage* VDAgent::new_message(DWORD bytes)
> {
> - MUTEX_LOCK(_write_mutex);
> - if (_pipe_state.write.end + bytes <=
> sizeof(_pipe_state.write.data)) {
> - return &_pipe_state.write.data[_pipe_state.write.end];
> - } else {
> - MUTEX_UNLOCK(_write_mutex);
> - vd_printf("write buffer is full");
> - return NULL;
> - }
> + return (VDPipeMessage*)(new char[bytes]);
> }
>
> -void VDAgent::write_unlock(DWORD bytes)
> +void VDAgent::enqueue_message(VDPipeMessage* msg)
> {
> - _pipe_state.write.end += bytes;
> + MUTEX_LOCK(_message_mutex);
> + _message_queue.push(msg);
> + MUTEX_UNLOCK(_message_mutex);
> + MUTEX_LOCK(_write_mutex);
> + if (!_pending_write) {
> + write_completion(0, 0, &_pipe_state.write.overlap);
> + }
> MUTEX_UNLOCK(_write_mutex);
> }
>
> --
> 1.7.4.1
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
>
More information about the Spice-devel
mailing list