[Spice-devel] [PATCH 1/3] vdagent: add message_queue for messages written to pipe

Arnon Gilboa agilboa at redhat.com
Mon Sep 10 01:00:54 PDT 2012


This is only part of the message corruption solution.
The other part is fixing virtio-serial / spice-qemu-char throttling code.

-replace write_[lock/unlock/completion] calls with [new/enqueue]_message
-remove clipboard specific _out_msg_* class members
-remove ugly loop - while (a->_out_msg && a->write_clipboard());
-add _message_mutex for message queue
-fix pending_write race using _write_mutex
-TODO: enqueue large message without dividing it to chunks in advance

rhbz #846427
---
 vdagent/vdagent.cpp |  186 +++++++++++++++++++++++++--------------------------
 1 files changed, 91 insertions(+), 95 deletions(-)

diff --git a/vdagent/vdagent.cpp b/vdagent/vdagent.cpp
index b8bad44..3ffafe3 100644
--- a/vdagent/vdagent.cpp
+++ b/vdagent/vdagent.cpp
@@ -94,10 +94,10 @@ private:
     enum { CONTROL_STOP, CONTROL_DESKTOP_SWITCH };
     void set_control_event(int control_command);
     void handle_control_event();
-    uint8_t* write_lock(DWORD bytes = 0);
-    void write_unlock(DWORD bytes = 0);
+    VDPipeMessage* new_message(DWORD bytes = 0);
+    void enqueue_message(VDPipeMessage* msg);
     bool write_message(uint32_t type, uint32_t size, void* data);
-    bool write_clipboard();
+    bool write_clipboard(VDAgentMessage* msg, uint32_t size);
     bool connect_pipe();
     bool send_input();
     void set_display_depth(uint32_t depth);
@@ -119,9 +119,6 @@ private:
     HANDLE _clipboard_event;
     VDAgentMessage* _in_msg;
     uint32_t _in_msg_pos;
-    VDAgentMessage* _out_msg;
-    uint32_t _out_msg_pos;
-    uint32_t _out_msg_size;
     bool _pending_input;
     bool _pending_write;
     bool _running;
@@ -131,7 +128,9 @@ private:
     VDPipeState _pipe_state;
     mutex_t _write_mutex;
     mutex_t _control_mutex;
+    mutex_t _message_mutex;
     std::queue<int> _control_queue;
+    std::queue<VDPipeMessage*> _message_queue;
 
     bool _logon_desktop;
     bool _display_setting_initialized;
@@ -167,9 +166,6 @@ VDAgent::VDAgent()
     , _clipboard_event (NULL)
     , _in_msg (NULL)
     , _in_msg_pos (0)
-    , _out_msg (NULL)
-    , _out_msg_pos (0)
-    , _out_msg_size (0)
     , _pending_input (false)
     , _pending_write (false)
     , _running (false)
@@ -193,6 +189,7 @@ VDAgent::VDAgent()
     ZeroMemory(&_pipe_state, sizeof(VDPipeState));
     MUTEX_INIT(_write_mutex);
     MUTEX_INIT(_control_mutex);
+    MUTEX_INIT(_message_mutex);
 
     _singleton = this;
 }
@@ -538,7 +535,7 @@ bool VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config, uint32_t port
     }
 
     DWORD msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply);
-    reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
+    reply_pipe_msg = new_message(msg_size);
     if (!reply_pipe_msg) {
         return false;
     }
@@ -553,10 +550,7 @@ bool VDAgent::handle_mon_config(VDAgentMonitorsConfig* mon_config, uint32_t port
     reply = (VDAgentReply*)reply_msg->data;
     reply->type = VD_AGENT_MONITORS_CONFIG;
     reply->error = display_count ? VD_AGENT_SUCCESS : VD_AGENT_ERROR;
-    write_unlock(msg_size);
-    if (!_pending_write) {
-        write_completion(0, 0, &_pipe_state.write.overlap);
-    }
+    enqueue_message(reply_pipe_msg);
     return true;
 }
 
@@ -669,7 +663,7 @@ bool VDAgent::send_announce_capabilities(bool request)
     uint32_t internal_msg_size = sizeof(VDAgentAnnounceCapabilities) + VD_AGENT_CAPS_BYTES;
 
     msg_size = VD_MESSAGE_HEADER_SIZE + internal_msg_size;
-    caps_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
+    caps_pipe_msg = new_message(msg_size);
     if (!caps_pipe_msg) {
         return false;
     }
@@ -694,10 +688,7 @@ bool VDAgent::send_announce_capabilities(bool request)
     for (uint32_t i = 0 ; i < caps_size; ++i) {
         vd_printf("%X", caps->caps[i]);
     }
-    write_unlock(msg_size);
-    if (!_pending_write) {
-        write_completion(0, 0, &_pipe_state.write.overlap);
-    }
+    enqueue_message(caps_pipe_msg);
     return true;
 }
 
@@ -750,11 +741,10 @@ bool VDAgent::handle_display_config(VDAgentDisplayConfig* display_config, uint32
     }
 
     msg_size = VD_MESSAGE_HEADER_SIZE + sizeof(VDAgentReply);
-    reply_pipe_msg = (VDPipeMessage*)write_lock(msg_size);
+    reply_pipe_msg = new_message(msg_size);
     if (!reply_pipe_msg) {
         return false;
     }
-
     reply_pipe_msg->type = VD_AGENT_COMMAND;
     reply_pipe_msg->opaque = port;
     reply_pipe_msg->size = sizeof(VDAgentMessage) + sizeof(VDAgentReply);
@@ -766,10 +756,7 @@ bool VDAgent::handle_display_config(VDAgentDisplayConfig* display_config, uint32
     reply = (VDAgentReply*)reply_msg->data;
     reply->type = VD_AGENT_DISPLAY_CONFIG;
     reply->error = VD_AGENT_SUCCESS;
-    write_unlock(msg_size);
-    if (!_pending_write) {
-        write_completion(0, 0, &_pipe_state.write.overlap);
-    }
+    enqueue_message(reply_pipe_msg);
     return true;
 }
 
@@ -778,16 +765,13 @@ bool VDAgent::handle_control(VDPipeMessage* msg)
     switch (msg->type) {
     case VD_AGENT_RESET: {
         vd_printf("Agent reset");
-        VDPipeMessage* ack = (VDPipeMessage*)write_lock(sizeof(VDPipeMessage));
+        VDPipeMessage* ack = new_message(sizeof(VDPipeMessage));
         if (!ack) {
             return false;
         }
         ack->type = VD_AGENT_RESET_ACK;
         ack->opaque = msg->opaque;
-        write_unlock(sizeof(VDPipeMessage));
-        if (!_pending_write) {
-            write_completion(0, 0, &_pipe_state.write.overlap);
-        }
+        enqueue_message(ack);
         break;
     }
     case VD_AGENT_SESSION_LOGON:
@@ -816,30 +800,30 @@ bool VDAgent::handle_control(VDPipeMessage* msg)
 
 //FIXME: division to max size chunks should NOT be here, but in the service
 //       here we should write the max possible size to the pipe
-bool VDAgent::write_clipboard()
+bool VDAgent::write_clipboard(VDAgentMessage* msg, uint32_t size)
 {
-    ASSERT(_out_msg);
-    DWORD n = MIN(sizeof(VDPipeMessage) + _out_msg_size - _out_msg_pos, VD_AGENT_MAX_DATA_SIZE);
-    VDPipeMessage* pipe_msg = (VDPipeMessage*)write_lock(n);
-    if (!pipe_msg) {
-        return false;
-    }
-    pipe_msg->type = VD_AGENT_COMMAND;
-    pipe_msg->opaque = VDP_CLIENT_PORT;
-    pipe_msg->size = n - sizeof(VDPipeMessage);
-    memcpy(pipe_msg->data, (char*)_out_msg + _out_msg_pos, n - sizeof(VDPipeMessage));
-    write_unlock(n);
-    if (!_pending_write) {
-        write_completion(0, 0, &_pipe_state.write.overlap);
-    }
-    _out_msg_pos += (n - sizeof(VDPipeMessage));
-    if (_out_msg_pos == _out_msg_size) {
-        delete[] (uint8_t *)_out_msg;
-        _out_msg = NULL;
-        _out_msg_size = 0;
-        _out_msg_pos = 0;
-    }
-    return true;
+    uint32_t pos = 0;
+    bool ret = true;
+
+    ASSERT(msg && size);
+    //FIXME: do it smarter - no loop, no memcopy
+    MUTEX_LOCK(_message_mutex);
+    while (pos < size) {
+        DWORD n = MIN(sizeof(VDPipeMessage) + size - pos, VD_AGENT_MAX_DATA_SIZE);
+        VDPipeMessage* pipe_msg = new_message(n);
+        if (!pipe_msg) {
+            ret = false;
+            break;
+        }
+        pipe_msg->type = VD_AGENT_COMMAND;
+        pipe_msg->opaque = VDP_CLIENT_PORT;
+        pipe_msg->size = n - sizeof(VDPipeMessage);
+        memcpy(pipe_msg->data, (char*)msg + pos, n - sizeof(VDPipeMessage));
+        enqueue_message(pipe_msg);
+        pos += (n - sizeof(VDPipeMessage));
+    }
+    MUTEX_UNLOCK(_message_mutex);
+    return ret;
 }
 
 bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL)
@@ -847,7 +831,7 @@ bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL)
     VDPipeMessage* pipe_msg;
     VDAgentMessage* msg;
 
-    pipe_msg = (VDPipeMessage*)write_lock(VD_MESSAGE_HEADER_SIZE + size);
+    pipe_msg = new_message(VD_MESSAGE_HEADER_SIZE + size);
     if (!pipe_msg) {
         return false;
     }
@@ -862,10 +846,7 @@ bool VDAgent::write_message(uint32_t type, uint32_t size = 0, void* data = NULL)
     if (size && data) {
         memcpy(msg->data, data, size);
     }
-    write_unlock(VD_MESSAGE_HEADER_SIZE + size);
-    if (!_pending_write) {
-        write_completion(0, 0, &_pipe_state.write.overlap);
-    }
+    enqueue_message(pipe_msg);
     return true;
 }
 
@@ -993,6 +974,8 @@ bool VDAgent::handle_clipboard_grab(VDAgentClipboardGrab* clipboard_grab, uint32
 // VD_AGENT_CLIPBOARD_NONE and no data, so the client will know the request failed.
 bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_request)
 {
+    VDAgentMessage* msg;
+    uint32_t msg_size;
     UINT format;
     HANDLE clip_data;
     uint8_t* new_data = NULL;
@@ -1008,10 +991,6 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques
         vd_printf("Unsupported clipboard type %u", clipboard_request->type);
         return false;
     }
-    if (_out_msg) {
-        vd_printf("clipboard change is already pending");
-        return false;
-    }
     if (!IsClipboardFormatAvailable(format) || !OpenClipboard(_hwnd)) {
         return false;
     }
@@ -1047,14 +1026,13 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques
         CloseClipboard();
         return false;
     }
-    _out_msg_pos = 0;
-    _out_msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) + new_size;
-    _out_msg = (VDAgentMessage*)new uint8_t[_out_msg_size];
-    _out_msg->protocol = VD_AGENT_PROTOCOL;
-    _out_msg->type = VD_AGENT_CLIPBOARD;
-    _out_msg->opaque = 0;
-    _out_msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size);
-    VDAgentClipboard* clipboard = (VDAgentClipboard*)_out_msg->data;
+    msg_size = sizeof(VDAgentMessage) + sizeof(VDAgentClipboard) + new_size;
+    msg = (VDAgentMessage*)new uint8_t[msg_size];
+    msg->protocol = VD_AGENT_PROTOCOL;
+    msg->type = VD_AGENT_CLIPBOARD;
+    msg->opaque = 0;
+    msg->size = (uint32_t)(sizeof(VDAgentClipboard) + new_size);
+    VDAgentClipboard* clipboard = (VDAgentClipboard*)msg->data;
     clipboard->type = clipboard_request->type;
 
     switch (clipboard_request->type) {
@@ -1070,7 +1048,8 @@ bool VDAgent::handle_clipboard_request(VDAgentClipboardRequest* clipboard_reques
         break;
     }
     CloseClipboard();
-    write_clipboard();
+    write_clipboard(msg, msg_size);
+    delete[] (uint8_t *)msg;
     return true;
 }
 
@@ -1281,8 +1260,8 @@ VOID CALLBACK VDAgent::write_completion(DWORD err, DWORD bytes, LPOVERLAPPED ove
 {
     VDAgent* a = _singleton;
     VDPipeState* ps = &a->_pipe_state;
+    DWORD size_left;
 
-    a->_pending_write = false;
     if (!a->_running) {
         return;
     }
@@ -1291,40 +1270,57 @@ VOID CALLBACK VDAgent::write_completion(DWORD err, DWORD bytes, LPOVERLAPPED ove
         a->_running = false;
         return;
     }
-    if (!a->write_lock()) {
-        a->_running = false;
-        return;
-    }
+    MUTEX_LOCK(a->_write_mutex);
     ps->write.start += bytes;
     if (ps->write.start == ps->write.end) {
         ps->write.start = ps->write.end = 0;
-        //DEBUG
-        while (a->_out_msg && a->write_clipboard());
-    } else if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start,
-                           ps->write.end - ps->write.start, overlap, write_completion)) {
-        a->_pending_write = true;
+    }
+
+    MUTEX_LOCK(a->_message_mutex);
+    size_left = sizeof(a->_pipe_state.write.data) - a->_pipe_state.write.end;
+    while (!a->_message_queue.empty()) {
+        VDPipeMessage* msg = a->_message_queue.front();
+        DWORD size = sizeof(VDPipeMessage) + msg->size;
+
+        if (size > size_left) {
+            break;
+        }
+        a->_message_queue.pop();
+        memcpy(a->_pipe_state.write.data + a->_pipe_state.write.end, msg, size);
+        a->_pipe_state.write.end += size;
+        size_left -= size;
+        delete msg;
+    }
+    MUTEX_UNLOCK(a->_message_mutex);
+
+    if (ps->write.start < ps->write.end) {
+        if (WriteFileEx(ps->pipe, ps->write.data + ps->write.start,
+                               ps->write.end - ps->write.start, overlap, write_completion)) {
+            a->_pending_write = true;
+        } else {
+            vd_printf("WriteFileEx() failed: %lu", GetLastError());
+            a->_running = false;
+        }
     } else {
-        vd_printf("WriteFileEx() failed: %lu", GetLastError());
-        a->_running = false;
+        a->_pending_write = false;
     }
-    a->write_unlock();
+    MUTEX_UNLOCK(a->_write_mutex);
 }
 
-uint8_t* VDAgent::write_lock(DWORD bytes)
+VDPipeMessage* VDAgent::new_message(DWORD bytes)
 {
-    MUTEX_LOCK(_write_mutex);
-    if (_pipe_state.write.end + bytes <= sizeof(_pipe_state.write.data)) {
-        return &_pipe_state.write.data[_pipe_state.write.end];
-    } else {
-        MUTEX_UNLOCK(_write_mutex);
-        vd_printf("write buffer is full");
-        return NULL;
-    }
+    return (VDPipeMessage*)(new char[bytes]);
 }
 
-void VDAgent::write_unlock(DWORD bytes)
+void VDAgent::enqueue_message(VDPipeMessage* msg)
 {
-    _pipe_state.write.end += bytes;
+    MUTEX_LOCK(_message_mutex);
+    _message_queue.push(msg);
+    MUTEX_UNLOCK(_message_mutex);
+    MUTEX_LOCK(_write_mutex);
+    if (!_pending_write) {
+        write_completion(0, 0, &_pipe_state.write.overlap);
+    }
     MUTEX_UNLOCK(_write_mutex);
 }
 
-- 
1.7.4.1



More information about the Spice-devel mailing list