[Spice-devel] [PATCH spice-gtk V5 1/3] file-xfer: handling various transfer messages in main channel
Marc-André Lureau
marcandre.lureau at gmail.com
Wed Jan 9 17:21:10 PST 2013
Hi
On Sat, Jan 5, 2013 at 11:59 AM, Dunrong Huang <riegamaths at gmail.com> wrote:
> V4 -> V5:
> * Imporve error report
> * Add spice_main_file_copy_finish() function.
> * Some code cleanup. (e.g. file_xfer_task_free())
> * Limit spice_main_file_copy() to a single file.
This last version is pretty close to being mergeable. It seems spicy
hangs after doing a file copy. And apparently it's partly my fault
since you copied a bug :)
if (was_empty) {
g_simple_async_result_set_op_res_gboolean(simple, TRUE);
g_simple_async_result_complete_in_idle(simple);
+ g_object_unref(simple);
return;
}
Can you confirm the bug/fix and update your patch?
> gtk/channel-main.c | 511 ++++++++++++++++++++++++++++++++++++++++++++++++
> gtk/channel-main.h | 12 ++
> gtk/map-file | 2 +
> gtk/spice-glib-sym-file | 2 +
> 4 files changed, 527 insertions(+)
>
> diff --git a/gtk/channel-main.c b/gtk/channel-main.c
> index 6b9ba8d..e23d67d 100644
> --- a/gtk/channel-main.c
> +++ b/gtk/channel-main.c
> @@ -18,6 +18,7 @@
> #include <math.h>
> #include <spice/vd_agent.h>
> #include <common/rect.h>
> +#include <glib/gstdio.h>
>
> #include "glib-compat.h"
> #include "spice-client.h"
> @@ -51,6 +52,24 @@
>
> typedef struct spice_migrate spice_migrate;
>
> +#define FILE_XFER_CHUNK_SIZE (VD_AGENT_MAX_DATA_SIZE * 32)
> +typedef struct SpiceFileXferTask {
> + uint32_t id;
> + GFile *file;
> + SpiceMainChannel *channel;
> + GFileInputStream *file_stream;
> + GFileCopyFlags flags;
> + GCancellable *cancellable;
> + GFileProgressCallback progress_callback;
> + gpointer progress_callback_data;
> + GAsyncReadyCallback callback;
> + gpointer user_data;
> + char buffer[FILE_XFER_CHUNK_SIZE];
> + uint64_t read_bytes;
> + uint64_t file_size;
> + GError *error;
> +} SpiceFileXferTask;
> +
> struct _SpiceMainChannelPrivate {
> enum SpiceMouseMode mouse_mode;
> bool agent_connected;
> @@ -79,6 +98,8 @@ struct _SpiceMainChannelPrivate {
> } display[MAX_DISPLAY];
> gint timer_id;
> GQueue *agent_msg_queue;
> + GList *file_xfer_task_list;
> + GSList *flushing;
>
> guint switch_host_delayed_id;
> guint migrate_delayed_id;
> @@ -802,6 +823,63 @@ static void agent_free_msg_queue(SpiceMainChannel *channel)
> c->agent_msg_queue = NULL;
> }
>
> +/* Here, flushing algorithm is stolen from spice-channel.c */
> +static void
> +file_xfer_flushed(SpiceMainChannel *channel, gboolean success)
> +{
> + SpiceMainChannelPrivate *c = channel->priv;
> + GSList *l;
> +
> + for (l = c->flushing; l != NULL; l = l->next) {
> + GSimpleAsyncResult *result = G_SIMPLE_ASYNC_RESULT(l->data);
> + g_simple_async_result_set_op_res_gboolean(result, success);
> + g_simple_async_result_complete_in_idle(result);
> + }
> +
> + g_slist_free_full(c->flushing, g_object_unref);
> + c->flushing = NULL;
> +}
> +
> +static void
> +file_xfer_flush_async(SpiceMainChannel *channel, GCancellable *cancellable,
> + GAsyncReadyCallback callback, gpointer user_data)
> +{
> + GSimpleAsyncResult *simple;
> + SpiceMainChannelPrivate *c = channel->priv;
> + gboolean was_empty;
> +
> + simple = g_simple_async_result_new(G_OBJECT(channel), callback, user_data,
> + file_xfer_flush_async);
> +
> + was_empty = g_queue_is_empty(c->agent_msg_queue);
> + if (was_empty) {
> + g_simple_async_result_set_op_res_gboolean(simple, TRUE);
> + g_simple_async_result_complete_in_idle(simple);
> + return;
> + }
> +
> + c->flushing = g_slist_append(c->flushing, simple);
> +}
> +
> +static gboolean
> +file_xfer_flush_finish(SpiceMainChannel *channel, GAsyncResult *result,
> + GError **error)
> +{
> + GSimpleAsyncResult *simple;
> +
> + simple = (GSimpleAsyncResult *)result;
> +
> + if (g_simple_async_result_propagate_error(simple, error)) {
> + return -1;
return FALSE
> + }
> +
> + g_return_val_if_fail(g_simple_async_result_is_valid(result,
> + G_OBJECT(channel), file_xfer_flush_async), FALSE);
> +
> + CHANNEL_DEBUG(channel, "flushed finished!");
> + return g_simple_async_result_get_op_res_gboolean(simple);
> +}
> +
> /* coroutine context */
> static void agent_send_msg_queue(SpiceMainChannel *channel)
> {
> @@ -814,6 +892,9 @@ static void agent_send_msg_queue(SpiceMainChannel *channel)
> out = g_queue_pop_head(c->agent_msg_queue);
> spice_msg_out_send_internal(out);
> }
> + if (g_queue_is_empty(c->agent_msg_queue) && c->flushing != NULL) {
> + file_xfer_flushed(channel, TRUE);
> + }
> }
>
> /* any context: the message is not flushed immediately,
> @@ -1384,6 +1465,216 @@ static void main_handle_agent_disconnected(SpiceChannel *channel, SpiceMsgIn *in
> agent_stopped(SPICE_MAIN_CHANNEL(channel));
> }
>
> +static gint file_xfer_task_find(gconstpointer a, gconstpointer b)
> +{
> + SpiceFileXferTask *task = (SpiceFileXferTask *)a;
> + uint32_t id = *(uint32_t *)b;
> +
> + if (task->id == id) {
> + return 0;
> + }
> +
> + return 1;
> +}
> +
> +static void file_read_cb(GObject *source_object,
> + GAsyncResult *res,
> + gpointer user_data);
> +static void file_xfer_continue_read(SpiceFileXferTask *task);
> +
> +static gboolean report_progress(gpointer user_data)
> +{
Make the function void and take SpiceFileXferTask * as argument
> + SpiceFileXferTask *task = user_data;
> +
> + if (task->progress_callback) {
> + task->progress_callback(task->read_bytes, task->file_size,
> + task->progress_callback_data);
> + }
> +
> + return FALSE;
> +}
> +
> +static void data_flushed_cb(GObject *source_object,
> + GAsyncResult *res,
> + gpointer user_data)
> +{
> + SpiceFileXferTask *task = user_data;
> + SpiceMainChannel *channel = (SpiceMainChannel *)source_object;
> + GError *error = NULL;
> +
> + file_xfer_flush_finish(channel, res, &error);
> +
> + if (error != NULL) {
> + g_warning("failed to flush xfer queue: %s", error->message);
> + g_clear_error(&error);
Async should probably take error and complete in this case.
> + }
> +
> + /* Report progress */
> + report_progress(task);
> +
> + /* Read more data */
> + file_xfer_continue_read(task);
> +}
> +
> +static void
> +file_xfer_queue(SpiceFileXferTask *task, int data_size)
> +{
> + VDAgentFileXferDataMessage *msg;
> + SpiceMainChannel *channel = SPICE_MAIN_CHANNEL(task->channel);
> +
> + msg = g_alloca(sizeof(VDAgentFileXferDataMessage));
> + msg->id = task->id;
> + msg->size = data_size;
> + agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_DATA, msg,
> + sizeof(VDAgentFileXferDataMessage), task->buffer,
> + data_size, NULL);
> + spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE);
> +}
> +
> +static void file_xfer_task_free(SpiceFileXferTask *task)
> +{
> + SpiceMainChannelPrivate *c;
> +
> + g_return_if_fail(task != NULL);
> +
> + c = task->channel->priv;
> + c->file_xfer_task_list = g_list_remove(c->file_xfer_task_list,
> + task);
> +
> + if (task->file) {
> + g_object_unref(task->file);
> + }
> + if (task->file_stream) {
> + g_object_unref(task->file_stream);
> + }
small nit: you may want to use g_clear_object(&foo) which is a bit
shorter and safer.
> + g_free(task);
> +}
> +
> +/* main context */
> +static void
> +file_close_cb(GObject *object,
> + GAsyncResult *close_res,
> + gpointer user_data)
> +{
> + GSimpleAsyncResult *res;
> + SpiceFileXferTask *task;
> + GInputStream *stream = G_INPUT_STREAM(object);
> + GError *error = NULL;
> +
> + stream = G_INPUT_STREAM(object);
> + task = user_data;
> +
> + g_input_stream_close_finish(stream, close_res, &error);
> + if (error) {
> + /* This error dont need to report to user, just print a log */
> + SPICE_DEBUG("close file error: %s", error->message);
> + g_clear_error(&error);
> + }
> +
> + /* Notify to user that files have been transferred or something error
> + happened. */
> + res = g_simple_async_result_new(G_OBJECT(task->channel),
> + task->callback,
> + task->user_data,
> + file_xfer_continue_read);
> + if (task->error) {
> + g_simple_async_result_take_error(res, task->error);
> + g_simple_async_result_set_op_res_gboolean(res, FALSE);
> + } else {
> + g_simple_async_result_set_op_res_gboolean(res, TRUE);
> + }
> + g_simple_async_result_complete_in_idle(res);
> + g_object_unref(res);
> +
> + file_xfer_task_free(task);
> +}
> +
> +/* main context */
> +static void file_read_cb(GObject *source_object,
> + GAsyncResult *res,
> + gpointer user_data)
> +{
> + SpiceFileXferTask *task = user_data;
> + SpiceMainChannel *channel = task->channel;
> + gssize count;
> + GError *error = NULL;
> +
> + count = g_input_stream_read_finish(G_INPUT_STREAM(task->file_stream),
> + res, &error);
> + if (count > 0) {
> + task->read_bytes += count;
> + file_xfer_queue(task, count);
> + file_xfer_flush_async(channel, task->cancellable,
> + data_flushed_cb, task);
> + } else {
> + /* Error or EOF, close the file */
> + if (count == -1) {
> + task->error = error;
> + }
> + g_input_stream_close_async(G_INPUT_STREAM(task->file_stream),
> + G_PRIORITY_DEFAULT,
> + task->cancellable,
> + file_close_cb,
> + task);
> + }
> +}
> +
> +/* coroutine context */
> +static void file_xfer_continue_read(SpiceFileXferTask *task)
> +{
> + g_input_stream_read_async(G_INPUT_STREAM(task->file_stream),
> + task->buffer,
> + FILE_XFER_CHUNK_SIZE,
> + G_PRIORITY_DEFAULT,
> + task->cancellable,
> + file_read_cb,
> + task);
> +}
> +
> +/* coroutine context */
> +static void file_xfer_handle_status(SpiceMainChannel *channel,
> + VDAgentFileXferStatusMessage *msg)
> +{
> + SpiceMainChannelPrivate *c = channel->priv;
> + GList *l;
> + SpiceFileXferTask *task;
> +
> + l = g_list_find_custom(c->file_xfer_task_list, &msg->id,
> + file_xfer_task_find);
> +
> + g_return_if_fail(l != NULL);
> + task = l->data;
> +
> + SPICE_DEBUG("task %d received response %d", msg->id, msg->result);
> +
> + switch (msg->result) {
> + case VD_AGENT_FILE_XFER_STATUS_CAN_SEND_DATA:
> + file_xfer_continue_read(task);
> + return;
> + case VD_AGENT_FILE_XFER_STATUS_CANCELLED:
> + SPICE_DEBUG("user removed task %d, result: %d", msg->id,
> + msg->result);
> + task->error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED,
> + "transfer is cancelled by spice agent");
> + break;
> + case VD_AGENT_FILE_XFER_STATUS_ERROR:
> + task->error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED,
> + "some errors occurred in the spice agent");
> + break;
> + default:
> + g_warn_if_reached();
> + task->error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED,
> + "unhandled status type: %u", msg->result);
> + break;
> + }
> +
> + g_input_stream_close_async(G_INPUT_STREAM(task->file_stream),
> + G_PRIORITY_DEFAULT,
> + task->cancellable,
> + file_close_cb,
> + task);
> +}
> +
> /* coroutine context */
> static void main_agent_handle_msg(SpiceChannel *channel,
> VDAgentMessage *msg, gpointer payload)
> @@ -1487,6 +1778,9 @@ static void main_agent_handle_msg(SpiceChannel *channel,
> reply->error == VD_AGENT_SUCCESS ? "success" : "error");
> break;
> }
> + case VD_AGENT_FILE_XFER_STATUS:
> + file_xfer_handle_status(SPICE_MAIN_CHANNEL(channel), payload);
> + break;
> default:
> g_warning("unhandled agent message type: %u (%s), size %u",
> msg->type, NAME(agent_msg_types, msg->type), msg->size);
> @@ -1563,6 +1857,7 @@ static void main_handle_agent_token(SpiceChannel *channel, SpiceMsgIn *in)
> SpiceMainChannelPrivate *c = SPICE_MAIN_CHANNEL(channel)->priv;
>
> c->agent_tokens += tokens->num_tokens;
> +
> agent_send_msg_queue(SPICE_MAIN_CHANNEL(channel));
> }
>
> @@ -2246,3 +2541,219 @@ void spice_main_set_display_enabled(SpiceMainChannel *channel, int id, gboolean
> c->display[id].enabled = enabled;
> }
> }
> +
> +static void
> +file_info_async_cb(GObject *obj, GAsyncResult *res, gpointer data)
> +{
> + GFileInfo *info;
> + GFile *file = G_FILE(obj);
> + GError *error = NULL;
> + GKeyFile *keyfile = NULL;
> + gchar *basename = NULL;
> + VDAgentFileXferStartMessage *msg;
> + gsize /*msg_size*/ data_len;
> + gchar *string;
> + SpiceFileXferTask *task = (SpiceFileXferTask *)data;
> + SpiceMainChannelPrivate *c = task->channel->priv;
> +
> + info = g_file_query_info_finish(file, res, &error);
> + if (error) {
> + SPICE_DEBUG("couldn't get size of file %s: %s",
> + g_file_get_path(file),
> + error->message);
> + goto failed;
> + }
> + task->file_size = g_file_info_get_attribute_uint64(info,
> + G_FILE_ATTRIBUTE_STANDARD_SIZE);
> +
> + keyfile = g_key_file_new();
> + if (keyfile == NULL) {
> + SPICE_DEBUG("failed to create key file: %s", error->message);
> + goto failed;
> + }
> +
> + /* File name */
> + basename = g_file_get_basename(file);
> + if (basename == NULL) {
> + SPICE_DEBUG("failed to get file basename: %s", error->message);
> + goto failed;
> + }
> + g_key_file_set_string(keyfile, "vdagent-file-xfer", "name", basename);
> + g_free(basename);
> +
> + /* File size */
> + g_key_file_set_uint64(keyfile, "vdagent-file-xfer", "size",
> + task->file_size);
> +
> + /* Save keyfile content to memory. TODO: more file attributions
> + need to be sent to guest */
> + string = g_key_file_to_data(keyfile, &data_len, &error);
> + g_key_file_free(keyfile);
> + if (error) {
> + goto failed;
> + }
> +
> + /* Create file-xfer start message */
> + msg = g_alloca(sizeof(VDAgentFileXferStartMessage));
> + msg->id = task->id;
> +
> + CHANNEL_DEBUG(task->channel, "Insert a xfer task:%d to task list",
> + task->id);
> + c->file_xfer_task_list = g_list_append(c->file_xfer_task_list, task);
> +
> + agent_msg_queue_many(task->channel, VD_AGENT_FILE_XFER_START, msg,
> + sizeof(VDAgentFileXferStartMessage), string,
> + data_len + 1, NULL);
> + g_free(string);
> + spice_channel_wakeup(SPICE_CHANNEL(task->channel), FALSE);
> + return ;
> +
> +failed:
> + g_clear_error(&error);
> + file_xfer_task_free(task);
> +}
> +
> +static void
> +read_async_cb(GObject *obj, GAsyncResult *res, gpointer data)
> +{
> + GFile *file = G_FILE(obj);
> + SpiceFileXferTask *task = (SpiceFileXferTask *)data;
> + GError *error = NULL;
> +
> + task->file_stream = g_file_read_finish(file, res, &error);
> +
> + if (task->file_stream) {
> + g_file_query_info_async(task->file,
> + G_FILE_ATTRIBUTE_STANDARD_SIZE,
> + G_FILE_QUERY_INFO_NONE,
> + G_PRIORITY_DEFAULT,
> + task->cancellable,
> + file_info_async_cb,
> + task);
> + } else {
> + SPICE_DEBUG("create file stream for %s error: %s",
> + g_file_get_path(file), error->message);
> + task->error = error;
> + g_input_stream_close_async(G_INPUT_STREAM(task->file_stream),
> + G_PRIORITY_DEFAULT,
> + task->cancellable,
> + file_close_cb,
> + task);
> + }
> +}
> +
> +static void
> +file_xfer_send_start_msg_async(SpiceMainChannel *channel,
> + GFile *file,
> + GFileCopyFlags flags,
> + GCancellable *cancellable,
> + GFileProgressCallback progress_callback,
> + gpointer progress_callback_data,
> + GAsyncReadyCallback callback,
> + gpointer user_data)
> +{
> + SpiceFileXferTask *task;
> + static uint32_t xfer_id; /* Used to identify task id */
> +
> + xfer_id = (xfer_id > UINT32_MAX) ? 0 : xfer_id;
> +
> + task = spice_malloc0(sizeof(SpiceFileXferTask));
> + task->id = ++xfer_id;
> + task->channel = channel;
> + task->file = g_object_ref(file);
> + task->flags = flags;
> + task->cancellable = cancellable;
> + task->progress_callback = progress_callback;
> + task->progress_callback_data = progress_callback_data;
> + task->callback = callback;
> + task->user_data = user_data;
> +
> + g_file_read_async(file,
> + G_PRIORITY_DEFAULT,
> + cancellable,
> + read_async_cb,
> + task);
> +
> +}
> +
> +/**
> + * spice_main_file_copy_async:
> + * @sources: #GFile to be transfer
> + * @flags: set of #GFileCopyFlags
> + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
> + * @progress_callback: (allow-none) (scope call): function to callback with
> + * progress information, or %NULL if progress information is not needed
> + * @progress_callback_data: (closure): user data to pass to @progress_callback
> + * @callback: a #GAsyncReadyCallback to call when the request is satisfied
> + * @user_data: the data to pass to callback function
> + *
> + * Copies the file @sources to guest
> + *
> + * If @cancellable is not %NULL, then the operation can be cancelled by
> + * triggering the cancellable object from another thread. If the operation
> + * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
> + *
> + * If @progress_callback is not %NULL, then the operation can be monitored by
> + * setting this to a #GFileProgressCallback function. @progress_callback_data
> + * will be passed to this function. It is guaranteed that this callback will
> + * be called after all data has been transferred with the total number of bytes
> + * copied during the operation.
> + *
> + * When the operation is finished, callback will be called. You can then call
> + * spice_main_file_copy_finish() to get the result of the operation.
> + *
> + **/
> +void spice_main_file_copy_async(SpiceMainChannel *channel,
> + GFile **sources,
> + GFileCopyFlags flags,
> + GCancellable *cancellable,
> + GFileProgressCallback progress_callback,
> + gpointer progress_callback_data,
> + GAsyncReadyCallback callback,
> + gpointer user_data)
> +{
> + g_return_if_fail(channel != NULL);
> + g_return_if_fail(SPICE_IS_MAIN_CHANNEL(channel));
> + g_return_if_fail(sources != NULL && sources[0] != NULL);
> +
> + /* At the moment, the copy() method is limited to a single file,
> + support for copying multi-files will be implemented later. */
> + g_return_if_fail(sources[1] == NULL);
> +
> + file_xfer_send_start_msg_async(channel,
> + sources[0],
> + flags,
> + cancellable,
> + progress_callback,
> + progress_callback_data,
> + callback,
> + user_data);
> +}
> +
> +/**
> + * spice_main_file_copy_finish:
> + * @result: a #GAsyncResult.
> + * @error: a #GError, or %NULL
> + *
> + * Finishes copying the file started with
> + * spice_main_file_copy_async().
> + *
> + * Returns: a %TRUE on success, %FALSE on error.
> + **/
> +gboolean spice_main_file_copy_finish(SpiceMainChannel *channel,
> + GAsyncResult *result,
> + GError **error)
> +{
> + GSimpleAsyncResult *simple;
> +
> + g_return_val_if_fail(SPICE_IS_MAIN_CHANNEL(channel), FALSE);
> + g_return_val_if_fail(result != NULL, FALSE);
> +
> + simple = (GSimpleAsyncResult *)result;
> +
> + if (g_simple_async_result_propagate_error(simple, error)) {
> + return FALSE;
> + }
> +
> + return g_simple_async_result_get_op_res_gboolean(simple);
> +}
> diff --git a/gtk/channel-main.h b/gtk/channel-main.h
> index 1a5ab54..adba0a2 100644
> --- a/gtk/channel-main.h
> +++ b/gtk/channel-main.h
> @@ -78,6 +78,18 @@ void spice_main_clipboard_selection_notify(SpiceMainChannel *channel, guint sele
> void spice_main_clipboard_selection_request(SpiceMainChannel *channel, guint selection, guint32 type);
>
> gboolean spice_main_agent_test_capability(SpiceMainChannel *channel, guint32 cap);
> +void spice_main_file_copy_async(SpiceMainChannel *channel,
> + GFile **sources,
> + GFileCopyFlags flags,
> + GCancellable *cancellable,
> + GFileProgressCallback progress_callback,
> + gpointer progress_callback_data,
> + GAsyncReadyCallback callback,
> + gpointer user_data);
> +
> +gboolean spice_main_file_copy_finish(SpiceMainChannel *channel,
> + GAsyncResult *result,
> + GError **error);
>
> #ifndef SPICE_DISABLE_DEPRECATED
> SPICE_DEPRECATED_FOR(spice_main_clipboard_selection_grab)
> diff --git a/gtk/map-file b/gtk/map-file
> index 516764c..4d05597 100644
> --- a/gtk/map-file
> +++ b/gtk/map-file
> @@ -55,6 +55,8 @@ spice_inputs_motion;
> spice_inputs_position;
> spice_inputs_set_key_locks;
> spice_main_agent_test_capability;
> +spice_main_file_copy_async;
> +spice_main_file_copy_finish;
> spice_main_channel_get_type;
> spice_main_clipboard_grab;
> spice_main_clipboard_notify;
> diff --git a/gtk/spice-glib-sym-file b/gtk/spice-glib-sym-file
> index 641ff4d..28b54af 100644
> --- a/gtk/spice-glib-sym-file
> +++ b/gtk/spice-glib-sym-file
> @@ -31,6 +31,8 @@ spice_inputs_motion
> spice_inputs_position
> spice_inputs_set_key_locks
> spice_main_agent_test_capability
> +spice_main_file_copy_async
> +spice_main_file_copy_finish
> spice_main_channel_get_type
> spice_main_clipboard_grab
> spice_main_clipboard_notify
> --
> 1.8.0
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
Next time should be good!
Thanks
--
Marc-André Lureau
More information about the Spice-devel
mailing list