[Spice-devel] [PATCH spice-gtk V4 1/3] file-xfer: handling various transfer messages in main channel
Marc-André Lureau
marcandre.lureau at gmail.com
Mon Dec 31 09:51:07 PST 2012
Hi!
On Fri, Dec 28, 2012 at 2:33 PM, Dunrong Huang <riegamaths at gmail.com> wrote:
> Agent channel is a flow-control channel. That means before
> we send agent data to server, we must obtain tokens distributed
> from spice server, if we do not do that, spice server will get error,
> or at least, the data will be discarded.
>
> Other type of agent data will be cached to agent_msg_queue if there
> are no more tokens. But for file-xfer data, if we cache too much of
> those data, our memory will be exhausted pretty quickly if file is
> too big.
>
> We also should make other agent data(clipboard, mouse, ...) get
> through when file-xfer data are sending.
>
> So, for the reason of above, we can not fill file-xfer data to agent queue
> too quickly, we must consider the tokens, and other messages.
>
> Marc-André suggested me to call spice_channel_flush_async() and wait the
> queued data to be sent, but the API does not consider the available
> tokens, so I use a new algorithm/API(file_xfer_flush_async) based on
> spice_channel_flush_async() to send file-xfer data.
>
>
Right
gtk/channel-main.c | 476
> ++++++++++++++++++++++++++++++++++++++++++++++++
> gtk/channel-main.h | 8 +
> gtk/map-file | 1 +
> gtk/spice-glib-sym-file | 1 +
> 4 files changed, 486 insertions(+)
>
> diff --git a/gtk/channel-main.c b/gtk/channel-main.c
> index 6b9ba8d..b1496bd 100644
> --- a/gtk/channel-main.c
> +++ b/gtk/channel-main.c
> @@ -18,6 +18,7 @@
> #include <math.h>
> #include <spice/vd_agent.h>
> #include <common/rect.h>
> +#include <glib/gstdio.h>
>
> #include "glib-compat.h"
> #include "spice-client.h"
> @@ -51,6 +52,24 @@
>
> typedef struct spice_migrate spice_migrate;
>
> +#define FILE_XFER_CHUNK_SIZE (VD_AGENT_MAX_DATA_SIZE * 32)
> +typedef struct SpiceFileXferTask {
> + uint32_t id;
> + uint32_t group_id;
> + GFile *file;
> + SpiceMainChannel *channel;
> + GFileInputStream *file_stream;
> + GFileCopyFlags flags;
> + GCancellable *cancellable;
> + GFileProgressCallback progress_callback;
> + gpointer progress_callback_data;
> + GAsyncReadyCallback callback;
> + gpointer user_data;
> + char buffer[FILE_XFER_CHUNK_SIZE];
> + uint64_t read_bytes;
> + uint64_t file_size;
> +} SpiceFileXferTask;
> +
> struct _SpiceMainChannelPrivate {
> enum SpiceMouseMode mouse_mode;
> bool agent_connected;
> @@ -79,6 +98,8 @@ struct _SpiceMainChannelPrivate {
> } display[MAX_DISPLAY];
> gint timer_id;
> GQueue *agent_msg_queue;
> + GList *file_xfer_task_list;
> + GSList *flushing;
>
> guint switch_host_delayed_id;
> guint migrate_delayed_id;
> @@ -802,6 +823,63 @@ static void agent_free_msg_queue(SpiceMainChannel
> *channel)
> c->agent_msg_queue = NULL;
> }
>
> +/* Here, flushing algorithm is stolen from spice-channel.c */
> +static void
> +file_xfer_flushed(SpiceMainChannel *channel, gboolean success)
> +{
> + SpiceMainChannelPrivate *c = channel->priv;
> + GSList *l;
> +
> + for (l = c->flushing; l != NULL; l = l->next) {
> + GSimpleAsyncResult *result = G_SIMPLE_ASYNC_RESULT(l->data);
> + g_simple_async_result_set_op_res_gboolean(result, success);
> + g_simple_async_result_complete_in_idle(result);
> + }
> +
> + g_slist_free_full(c->flushing, g_object_unref);
> + c->flushing = NULL;
> +}
> +
> +static void
> +file_xfer_flush_async(SpiceMainChannel *channel, GCancellable
> *cancellable,
> + GAsyncReadyCallback callback, gpointer user_data)
> +{
> + GSimpleAsyncResult *simple;
> + SpiceMainChannelPrivate *c = channel->priv;
> + gboolean was_empty;
> +
> + simple = g_simple_async_result_new(G_OBJECT(channel), callback,
> user_data,
> + file_xfer_flush_async);
> +
> + was_empty = g_queue_is_empty(c->agent_msg_queue);
> + if (was_empty) {
> + g_simple_async_result_set_op_res_gboolean(simple, TRUE);
> + g_simple_async_result_complete_in_idle(simple);
> + return;
> + }
> +
> + c->flushing = g_slist_append(c->flushing, simple);
> +}
> +
> +static gboolean
> +file_xfer_flush_finish(SpiceMainChannel *channel, GAsyncResult *result,
> + GError **error)
> +{
> + GSimpleAsyncResult *simple;
> +
> + simple = (GSimpleAsyncResult *)result;
> +
> + if (g_simple_async_result_propagate_error(simple, error)) {
> + return -1;
> + }
> +
> + g_return_val_if_fail(g_simple_async_result_is_valid(result,
> + G_OBJECT(channel), file_xfer_flush_async), FALSE);
> +
> + CHANNEL_DEBUG(channel, "flushed finished!");
> + return g_simple_async_result_get_op_res_gboolean(simple);
> +}
> +
> /* coroutine context */
> static void agent_send_msg_queue(SpiceMainChannel *channel)
> {
> @@ -814,6 +892,9 @@ static void agent_send_msg_queue(SpiceMainChannel
> *channel)
> out = g_queue_pop_head(c->agent_msg_queue);
> spice_msg_out_send_internal(out);
> }
> + if (g_queue_is_empty(c->agent_msg_queue) && c->flushing != NULL) {
> + file_xfer_flushed(channel, TRUE);
> + }
> }
>
> /* any context: the message is not flushed immediately,
> @@ -1384,6 +1465,203 @@ static void
> main_handle_agent_disconnected(SpiceChannel *channel, SpiceMsgIn *in
> agent_stopped(SPICE_MAIN_CHANNEL(channel));
> }
>
> +static gint file_xfer_task_find(gconstpointer a, gconstpointer b)
> +{
> + SpiceFileXferTask *task = (SpiceFileXferTask *)a;
> + uint32_t id = *(uint32_t *)b;
> +
> + if (task->id == id) {
> + return 0;
> + }
> +
> + return 1;
> +}
> +
> +static void file_read_cb(GObject *source_object,
> + GAsyncResult *res,
> + gpointer user_data);
> +
> +static gboolean report_progress(gpointer user_data)
> +{
> + SpiceFileXferTask *task = user_data;
> + SpiceMainChannelPrivate *c = task->channel->priv;
> +
> + if (task->progress_callback) {
> + uint64_t all_read_bytes = 0, all_bytes = 0;
> + GList *it;
> + for (it = g_list_first(c->file_xfer_task_list);
> + it != NULL; it = g_list_next(it)) {
> + SpiceFileXferTask *t;
> + t = it->data;
> + /* Calculate all remain bytes through group id, NB: we dont
> + * consider the task that has been finished */
> + if (t->group_id == task->id) {
> + all_read_bytes += t->read_bytes;
> + all_bytes += t->file_size;
> + }
> + }
> + task->progress_callback(all_read_bytes, all_bytes,
> + task->progress_callback_data);
> + }
> +
> + return FALSE;
> +}
> +
> +static void data_flushed_cb(GObject *source_object,
> + GAsyncResult *res,
> + gpointer user_data)
> +{
> + SpiceFileXferTask *task = user_data;
> + SpiceMainChannel *channel = (SpiceMainChannel *)source_object;
> + GError *error = NULL;
> +
> + file_xfer_flush_finish(channel, res, &error);
>
Even if the function currently doesn't report error, you should treat error
case at least with a g_warning().
> + /* Report progress */
> + report_progress(task);
> +
> + /* Read more data */
> + g_input_stream_read_async(G_INPUT_STREAM(task->file_stream),
> + task->buffer,
> + FILE_XFER_CHUNK_SIZE,
> + G_PRIORITY_DEFAULT,
> + task->cancellable,
> + file_read_cb,
> + task);
> +}
> +
> +static void
> +file_xfer_queue(SpiceFileXferTask *task, int data_size)
> +{
> + VDAgentFileXferDataMessage *msg;
> + SpiceMainChannel *channel = SPICE_MAIN_CHANNEL(task->channel);
> +
> + msg = g_alloca(sizeof(VDAgentFileXferDataMessage));
> + msg->id = task->id;
> + msg->size = data_size;
> + agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_DATA, msg,
> + sizeof(VDAgentFileXferDataMessage), task->buffer,
> + data_size, NULL);
> + spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE);
> +}
> +
> +static void report_finish(SpiceFileXferTask *task)
> +{
> + if (task->callback) {
> + GSimpleAsyncResult *res;
> + res = g_simple_async_result_new(G_OBJECT(task->file),
> task->callback,
> + task->user_data, report_finish);
> + g_simple_async_result_set_op_res_gboolean(res, TRUE);
> + g_simple_async_result_complete_in_idle(res);
> + g_object_unref(res);
> + }
> +}
> +
> +/* main context */
> +static void
> +file_close_cb(GObject *object,
> + GAsyncResult *res,
> + gpointer user_data)
> +{
> + SpiceFileXferTask *task = user_data;
> + GInputStream *stream = G_INPUT_STREAM(object);
> + SpiceMainChannelPrivate *c = SPICE_MAIN_CHANNEL(task->channel)->priv;
> + GError *error = NULL;
> +
> + g_input_stream_close_finish(stream, res, &error);
> + if (error) {
> + SPICE_DEBUG("close file error: %s", error->message);
> + g_clear_error(&error);
> + }
> +
> + c->file_xfer_task_list = g_list_remove(c->file_xfer_task_list, task);
> +
> + /* If all tasks have been finished, notify to user */
> + if (g_list_length(c->file_xfer_task_list) == 0) {
> + report_finish(task);
> + }
> + g_object_unref(task->file);
> + g_object_unref(task->file_stream);
> + g_free(task);
> +}
> +
> +/* main context */
> +static void file_read_cb(GObject *source_object,
> + GAsyncResult *res,
> + gpointer user_data)
> +{
> + SpiceFileXferTask *task = user_data;
> + SpiceMainChannel *channel = task->channel;
> + gssize count;
> + GError *error = NULL;
> +
> + count = g_input_stream_read_finish(G_INPUT_STREAM(task->file_stream),
> + res, &error);
> + if (count > 0) {
> + task->read_bytes += count;
> + file_xfer_queue(task, count);
> + file_xfer_flush_async(channel, task->cancellable,
> + data_flushed_cb, task);
> + } else {
> + g_input_stream_close_async(G_INPUT_STREAM(task->file_stream),
> + G_PRIORITY_DEFAULT,
> + task->cancellable,
> + file_close_cb,
> + task);
>
You should report error to caller (probably with
g_simple_async_report_gerror_in_idle)
There will be some issues with the fact that there are multiple outstanding
async in the background. The common pattern is to _always_ complete one
async() call with one result (succesful or error). There shouldn't be
"lost" async, or you may "block" some client execution path.
+ }
> +}
> +
> +/* coroutine context */
> +static void file_xfer_send_data_msg(SpiceMainChannel *channel, uint32_t
> id)
> +{
> + SpiceMainChannelPrivate *c = channel->priv;
> + GList *l;
> + SpiceFileXferTask *task;
> +
> + l = g_list_find_custom(c->file_xfer_task_list, &id,
> + file_xfer_task_find);
> +
> + g_return_if_fail(l != NULL);
> +
> + task = l->data;
> + g_input_stream_read_async(G_INPUT_STREAM(task->file_stream),
> + task->buffer,
> + FILE_XFER_CHUNK_SIZE,
> + G_PRIORITY_DEFAULT,
> + task->cancellable,
> + file_read_cb,
> + task);
> +}
> +
> +/* coroutine context */
> +static void file_xfer_handle_status(SpiceMainChannel *channel,
> + VDAgentFileXferStatusMessage *msg)
> +{
> + SPICE_DEBUG("task %d received response %d", msg->id, msg->result);
>
You could lookup the task only once here.
+
> + if (msg->result == VD_AGENT_FILE_XFER_STATUS_CAN_SEND_DATA) {
> + file_xfer_send_data_msg(channel, msg->id);
>
and can call g_input_stream_read_async() directly here, or perhaps move the
read_async() in a seperate function task_continue_read() which will be
called also from data_flushed_cb()
> + } else {
>
Please check precisely the other msg->result values, and do a
g_warn_if_reached() for unknown values.
+ /* Error, remove this task */
> + SpiceMainChannelPrivate *c = channel->priv;
> + GList *l;
> + SpiceFileXferTask *task;
> +
> + l = g_list_find_custom(c->file_xfer_task_list, &msg->id,
> + file_xfer_task_find);
> + g_return_if_fail(l != NULL);
> +
> + task = l->data;
> + SPICE_DEBUG("user removed task %d, result: %d", msg->id,
> + msg->result);
>
You should report error to caller (probably with
g_simple_async_report_gerror_in_idle)
+ c->file_xfer_task_list = g_list_remove(c->file_xfer_task_list,
> + task);
> + g_object_unref(task->file);
> + g_object_unref(task->file_stream);
> + g_free(task);
Those last 4 lines could probably be in a seperate function
file_xfer_task_free ()
+ }
> +}
> +
> /* coroutine context */
> static void main_agent_handle_msg(SpiceChannel *channel,
> VDAgentMessage *msg, gpointer payload)
> @@ -1487,6 +1765,9 @@ static void main_agent_handle_msg(SpiceChannel
> *channel,
> reply->error == VD_AGENT_SUCCESS ? "success" :
> "error");
> break;
> }
> + case VD_AGENT_FILE_XFER_STATUS:
> + file_xfer_handle_status(SPICE_MAIN_CHANNEL(channel), payload);
> + break;
> default:
> g_warning("unhandled agent message type: %u (%s), size %u",
> msg->type, NAME(agent_msg_types, msg->type), msg->size);
> @@ -1563,6 +1844,7 @@ static void main_handle_agent_token(SpiceChannel
> *channel, SpiceMsgIn *in)
> SpiceMainChannelPrivate *c = SPICE_MAIN_CHANNEL(channel)->priv;
>
> c->agent_tokens += tokens->num_tokens;
> +
> agent_send_msg_queue(SPICE_MAIN_CHANNEL(channel));
> }
>
> @@ -2246,3 +2528,197 @@ void
> spice_main_set_display_enabled(SpiceMainChannel *channel, int id, gboolean
> c->display[id].enabled = enabled;
> }
> }
> +
> +static void
> +file_info_async_cb(GObject *obj, GAsyncResult *res, gpointer data)
> +{
> + GFileInfo *info;
> + GFile *file = G_FILE(obj);
> + GError *error = NULL;
> + GKeyFile *keyfile = NULL;
> + gchar *basename = NULL;
> + VDAgentFileXferStartMessage *msg;
> + gsize msg_size, data_len;
> + gchar *string;
> + SpiceFileXferTask *task = (SpiceFileXferTask *)data;
> + SpiceMainChannelPrivate *c = task->channel->priv;
> +
> + info = g_file_query_info_finish(file, res, &error);
> + if (error) {
> + SPICE_DEBUG("couldn't get size of file %s: %s",
> + g_file_get_path(file),
> + error->message);
> + goto failed;
> + }
> + task->file_size = g_file_info_get_attribute_uint64(info,
> + G_FILE_ATTRIBUTE_STANDARD_SIZE);
> +
> + keyfile = g_key_file_new();
> + if (keyfile == NULL) {
> + SPICE_DEBUG("failed to create key file: %s", error->message);
> + goto failed;
> + }
> +
> + /* File name */
> + basename = g_file_get_basename(file);
> + if (basename == NULL) {
> + SPICE_DEBUG("failed to get file basename: %s", error->message);
> + goto failed;
> + }
> + g_key_file_set_string(keyfile, "vdagent-file-xfer", "name", basename);
> + g_free(basename);
> +
> + /* File size */
> + g_key_file_set_uint64(keyfile, "vdagent-file-xfer", "size",
> + task->file_size);
> +
> + /* Save keyfile content to memory. TODO: more file attributions
> + need to be sent to guest */
> + string = g_key_file_to_data(keyfile, &data_len, &error);
> + g_key_file_free(keyfile);
> + if (error) {
> + goto failed;
> + }
> +
> + /* Create file-xfer start message */
> + msg_size = sizeof(VDAgentFileXferStartMessage) + data_len + 1;
> + msg = g_malloc0(msg_size);
>
This allocation and copy seems unnecessary, can you use "string" directly?
+ msg->id = task->id;
> + memcpy(msg->data, string, data_len + 1);
> + g_free(string);
> +
> + CHANNEL_DEBUG(task->channel, "Insert a xfer task:%d to task list",
> + task->id);
> + c->file_xfer_task_list = g_list_append(c->file_xfer_task_list, task);
> +
> + agent_msg_queue(task->channel, VD_AGENT_FILE_XFER_START, msg_size,
> msg);
> + g_free(msg);
> + spice_channel_wakeup(SPICE_CHANNEL(task->channel), FALSE);
> + return ;
> +
> +failed:
> + g_clear_error(&error);
> + g_object_unref(task->file);
> + g_object_unref(task->file_stream);
> + g_free(task);
> +}
> +
> +static void
> +read_async_cb(GObject *obj, GAsyncResult *res, gpointer data)
> +{
> + GFile *file = G_FILE(obj);
> + SpiceFileXferTask *task = (SpiceFileXferTask *)data;
> + GError *error = NULL;
> +
> + task->file_stream = g_file_read_finish(file, res, &error);
> +
> + if (task->file_stream) {
> + g_file_query_info_async(task->file,
> + G_FILE_ATTRIBUTE_STANDARD_SIZE,
> + G_FILE_QUERY_INFO_NONE,
> + G_PRIORITY_DEFAULT,
> + task->cancellable,
> + file_info_async_cb,
> + task);
> + } else {
> + SPICE_DEBUG("create file stream for %s error: %s",
> + g_file_get_path(file), error->message);
> + g_clear_error(&error);
> + g_object_unref(task->file);
> + g_free(task);
>
You should report error to caller (probably with
g_simple_async_report_gerror_in_idle)
+ }
> +}
> +
> +static void
> +file_xfer_send_start_msg_async(SpiceMainChannel *channel,
> + GFile *file,
> + GFileCopyFlags flags,
> + GCancellable *cancellable,
> + GFileProgressCallback progress_callback,
> + gpointer progress_callback_data,
> + GAsyncReadyCallback callback,
> + gpointer user_data,
> + uint32_t group_id)
> +{
> + SpiceFileXferTask *task;
> + static uint32_t xfer_id; /* Used to identify task id */
> +
> + xfer_id = (xfer_id > UINT32_MAX) ? 0 : xfer_id;
> +
> + task = spice_malloc0(sizeof(SpiceFileXferTask));
> + task->id = ++xfer_id;
> + task->group_id = group_id;
> + task->channel = channel;
> + task->file = g_object_ref(file);
> + task->flags = flags;
> + task->cancellable = cancellable;
> + task->progress_callback = progress_callback;
> + task->progress_callback_data = progress_callback_data;
> + task->callback = callback;
> + task->user_data = user_data;
> +
> + g_file_read_async(file,
> + G_PRIORITY_DEFAULT,
> + cancellable,
> + read_async_cb,
> + task);
> +
> +}
> +
> +/**
> + * spice_main_file_copy_async:
> + * @sources: #GFile to be transfer
> + * @flags: set of #GFileCopyFlags
> + * @cancellable: (allow-none): optional #GCancellable object, %NULL to
> ignore
> + * @progress_callback: (allow-none) (scope call): function to callback
> with
> + * progress information, or %NULL if progress information is not
> needed
> + * @progress_callback_data: (closure): user data to pass to
> @progress_callback
> + * @error: #GError to set on error, or %NULL
> + *
> + * Copies the file @sources to guest
> + *
> + * If @cancellable is not %NULL, then the operation can be cancelled by
> + * triggering the cancellable object from another thread. If the operation
> + * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
> + *
> + * If @progress_callback is not %NULL, then the operation can be
> monitored by
> + * setting this to a #GFileProgressCallback function.
> @progress_callback_data
> + * will be passed to this function. It is guaranteed that this callback
> will
> + * be called after all data has been transferred with the total number of
> bytes
> + * copied during the operation.
> + *
> + * When the operation is finished, callback will be called.
> + *
> + **/
> +void spice_main_file_copy_async(SpiceMainChannel *channel,
> + GFile **sources,
> + GFileCopyFlags flags,
> + GCancellable *cancellable,
> + GFileProgressCallback progress_callback,
> + gpointer progress_callback_data,
> + GAsyncReadyCallback callback,
> + gpointer user_data)
> +{
> + int i = 0;
> + static uint32_t xfer_group_id;
> +
> + g_return_if_fail(channel != NULL);
> + g_return_if_fail(SPICE_IS_MAIN_CHANNEL(channel));
> + g_return_if_fail(sources != NULL);
> +
> + xfer_group_id++;
> + xfer_group_id = (xfer_group_id > UINT32_MAX) ? 0 : xfer_group_id;
> + while (sources[i]) {
> + /* All tasks created from below function have same group id */
>
I am worried by the server side handling of sharing the same group id for
several requests. But I am okay with this communication pattern that can be
later improved if needed.
> + file_xfer_send_start_msg_async(channel,
> + sources[i],
> + flags,
> + cancellable,
> + progress_callback,
> + progress_callback_data,
> + callback,
> + user_data,
> + xfer_group_id);
> + i++;
> + }
> +}
> diff --git a/gtk/channel-main.h b/gtk/channel-main.h
> index 1a5ab54..d00490f 100644
> --- a/gtk/channel-main.h
> +++ b/gtk/channel-main.h
> @@ -78,6 +78,14 @@ void
> spice_main_clipboard_selection_notify(SpiceMainChannel *channel, guint sele
> void spice_main_clipboard_selection_request(SpiceMainChannel *channel,
> guint selection, guint32 type);
>
> gboolean spice_main_agent_test_capability(SpiceMainChannel *channel,
> guint32 cap);
> +void spice_main_file_copy_async(SpiceMainChannel *channel,
> + GFile **sources,
> + GFileCopyFlags flags,
> + GCancellable *cancellable,
> + GFileProgressCallback progress_callback,
> + gpointer progress_callback_data,
> + GAsyncReadyCallback callback,
> + gpointer user_data);
>
> #ifndef SPICE_DISABLE_DEPRECATED
> SPICE_DEPRECATED_FOR(spice_main_clipboard_selection_grab)
> diff --git a/gtk/map-file b/gtk/map-file
> index 516764c..9988e7d 100644
> --- a/gtk/map-file
> +++ b/gtk/map-file
> @@ -55,6 +55,7 @@ spice_inputs_motion;
> spice_inputs_position;
> spice_inputs_set_key_locks;
> spice_main_agent_test_capability;
> +spice_main_file_copy_async;
> spice_main_channel_get_type;
> spice_main_clipboard_grab;
> spice_main_clipboard_notify;
> diff --git a/gtk/spice-glib-sym-file b/gtk/spice-glib-sym-file
> index 641ff4d..81fedd5 100644
> --- a/gtk/spice-glib-sym-file
> +++ b/gtk/spice-glib-sym-file
> @@ -31,6 +31,7 @@ spice_inputs_motion
> spice_inputs_position
> spice_inputs_set_key_locks
> spice_main_agent_test_capability
> +spice_main_file_copy_async
> spice_main_channel_get_type
> spice_main_clipboard_grab
> spice_main_clipboard_notify
> --
> 1.8.0
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
>
Thanks a lot for your work!
--
Marc-André Lureau
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.freedesktop.org/archives/spice-devel/attachments/20121231/39df9688/attachment-0001.html>
More information about the Spice-devel
mailing list