[Spice-devel] [spice-gtk v3 06/16] file-xfer: a FileTransferOperation per transfer call

Victor Toso victortoso at redhat.com
Mon May 30 09:55:02 UTC 2016


Each call to spice_main_file_copy_async will now create a
FileTransferOperation which groups all SpiceFileTransferTasks of the
copy operation and also the progress_callback passed from Application.

As pointed in the fix 113093dd00a1cf10f6d3c3589b7589a184cec081, the
progress_callback should provide information of the whole transfer
operation; For that reason, there is no need to keep progress_callback
and progress_callback_data per SpiceFileTransferTask but per
FileTransferOperation.

The file_xfer_tasks hash table now has FileTransferOperation instead
of SpiceFileTransferTask. To improve handling this new operation, I've
created the helpers:

* file_transfer_operation_send_progress
* file_transfer_operation_end
* file_transfer_operation_reset_all
* file_transfer_operation_find_task_by_id
* file_transfer_operation_task_finished

This change is related to split SpiceFileTransferTask from
channel-main.
---
 src/channel-main.c | 206 ++++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 139 insertions(+), 67 deletions(-)

diff --git a/src/channel-main.c b/src/channel-main.c
index 72dcf1f..f36326d 100644
--- a/src/channel-main.c
+++ b/src/channel-main.c
@@ -66,8 +66,6 @@ static GList *spice_file_transfer_task_create_tasks(SpiceMainChannel *channel,
                                                     GFile **files,
                                                     GFileCopyFlags flags,
                                                     GCancellable *cancellable,
-                                                    GFileProgressCallback progress_callback,
-                                                    gpointer progress_callback_data,
                                                     SpiceFileTransferTaskFlushCb flush_callback,
                                                     gpointer flush_callback_data,
                                                     GAsyncReadyCallback callback,
@@ -103,8 +101,6 @@ struct _SpiceFileTransferTask
     GFileInputStream               *file_stream;
     GFileCopyFlags                 flags;
     GCancellable                   *cancellable;
-    GFileProgressCallback          progress_callback;
-    gpointer                       progress_callback_data;
     SpiceFileTransferTaskFlushCb   flush_callback;
     gpointer                       flush_callback_data;
     GAsyncReadyCallback            callback;
@@ -156,6 +152,15 @@ typedef struct {
     SpiceDisplayState       display_state;
 } SpiceDisplayConfig;
 
+typedef struct {
+    GList                      *tasks;
+    SpiceMainChannel           *channel;
+    GFileProgressCallback       progress_callback;
+    gpointer                    progress_callback_data;
+    goffset                     total_sent;
+    goffset                     transfer_size;
+} FileTransferOperation;
+
 struct _SpiceMainChannelPrivate  {
     enum SpiceMouseMode         mouse_mode;
     enum SpiceMouseMode         requested_mouse_mode;
@@ -257,6 +262,13 @@ static void file_xfer_flushed(SpiceMainChannel *channel, gboolean success);
 static void spice_main_set_max_clipboard(SpiceMainChannel *self, gint max);
 static void set_agent_connected(SpiceMainChannel *channel, gboolean connected);
 
+static void file_transfer_operation_send_progress(SpiceMainChannel *channel, SpiceFileTransferTask *xfer_task);
+static void file_transfer_operation_end(FileTransferOperation *xfer_op);
+static void file_transfer_operation_reset_all(SpiceMainChannel *channel);
+static SpiceFileTransferTask *file_transfer_operation_find_task_by_id(SpiceMainChannel *channel,
+                                                                      guint32 task_id);
+static void file_transfer_operation_task_finished(SpiceMainChannel *channel, SpiceFileTransferTask *task);
+
 /* ------------------------------------------------------------------ */
 
 static const char *agent_msg_types[] = {
@@ -315,8 +327,7 @@ static void spice_main_channel_init(SpiceMainChannel *channel)
 
     c = channel->priv = SPICE_MAIN_CHANNEL_GET_PRIVATE(channel);
     c->agent_msg_queue = g_queue_new();
-    c->file_xfer_tasks = g_hash_table_new_full(g_direct_hash, g_direct_equal,
-                                               NULL, g_object_unref);
+    c->file_xfer_tasks = g_hash_table_new(g_direct_hash, g_direct_equal);
     c->flushing = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
                                         g_object_unref);
     c->cancellable_volume_info = g_cancellable_new();
@@ -470,9 +481,6 @@ static void spice_channel_iterate_write(SpiceChannel *channel)
 static void spice_main_channel_reset_agent(SpiceMainChannel *channel)
 {
     SpiceMainChannelPrivate *c = channel->priv;
-    GError *error;
-    GList *tasks;
-    GList *l;
 
     c->agent_connected = FALSE;
     c->agent_caps_received = FALSE;
@@ -481,15 +489,7 @@ static void spice_main_channel_reset_agent(SpiceMainChannel *channel)
     g_clear_pointer(&c->agent_msg_data, g_free);
     c->agent_msg_size = 0;
 
-    tasks = g_hash_table_get_values(c->file_xfer_tasks);
-    for (l = tasks; l != NULL; l = l->next) {
-        SpiceFileTransferTask *task = (SpiceFileTransferTask *)l->data;
-
-        error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED,
-                            "Agent connection closed");
-        spice_file_transfer_task_completed(task, error);
-    }
-    g_list_free(tasks);
+    file_transfer_operation_reset_all(channel);
     file_xfer_flushed(channel, FALSE);
 }
 
@@ -1878,7 +1878,9 @@ static void file_xfer_data_flushed_cb(GObject *source_object,
     SpiceMainChannel *channel = (SpiceMainChannel *)source_object;
     GError *error = NULL;
 
-    file_xfer_flush_finish(channel, res, &error);
+    if (file_xfer_flush_finish(channel, res, &error))
+        file_transfer_operation_send_progress(channel, self);
+
     spice_file_transfer_task_flush_done(self, error);
 }
 
@@ -1905,7 +1907,9 @@ static void file_xfer_flush_callback(SpiceFileTransferTask *xfer_task,
 {
     SpiceMainChannel *main_channel;
     GCancellable *cancellable;
+    FileTransferOperation *xfer_op = user_data;
 
+    xfer_op->total_sent += count;
     file_xfer_queue(xfer_task, buffer, count);
     if (count == 0)
         return;
@@ -2146,7 +2150,7 @@ static void main_agent_handle_msg(SpiceChannel *channel,
         SpiceFileTransferTask *task;
         VDAgentFileXferStatusMessage *msg = payload;
 
-        task = g_hash_table_lookup(c->file_xfer_tasks, GUINT_TO_POINTER(msg->id));
+        task = file_transfer_operation_find_task_by_id(self, msg->id);
         if (task != NULL) {
             spice_file_transfer_task_handle_status(task, msg);
         } else {
@@ -3042,21 +3046,22 @@ static void file_xfer_on_file_info(SpiceFileTransferTask *xfer_task,
                                    GFileInfo *info,
                                    gpointer data)
 {
-    SpiceMainChannel *channel;
     GKeyFile *keyfile;
     VDAgentFileXferStartMessage msg;
     gchar *string, *basename;
     guint64 file_size;
     gsize data_len;
     GError *error = NULL;
+    FileTransferOperation *xfer_op;
 
     g_return_if_fail(info != NULL);
-
-    channel = SPICE_MAIN_CHANNEL(data);
+    xfer_op = data;
 
     basename = g_file_info_get_attribute_as_string(info, G_FILE_ATTRIBUTE_STANDARD_NAME);
     file_size = g_file_info_get_attribute_uint64(info, G_FILE_ATTRIBUTE_STANDARD_SIZE);
 
+    xfer_op->transfer_size += file_size;
+
     keyfile = g_key_file_new();
     g_key_file_set_string(keyfile, "vdagent-file-xfer", "name", basename);
     g_key_file_set_uint64(keyfile, "vdagent-file-xfer", "size", file_size);
@@ -3072,34 +3077,122 @@ static void file_xfer_on_file_info(SpiceFileTransferTask *xfer_task,
 
     /* Create file-xfer start message */
     msg.id = spice_file_transfer_task_get_id(xfer_task);
-    agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_START,
+    agent_msg_queue_many(xfer_op->channel, VD_AGENT_FILE_XFER_START,
                          &msg, sizeof(msg),
                          string, data_len + 1, NULL);
     g_free(string);
-    spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE);
+    spice_channel_wakeup(SPICE_CHANNEL(xfer_op->channel), FALSE);
     return;
 
 failed:
     spice_file_transfer_task_completed(xfer_task, error);
 }
 
+static void file_transfer_operation_send_progress(SpiceMainChannel *channel, SpiceFileTransferTask *xfer_task)
+{
+    FileTransferOperation *xfer_op;
+    guint32 task_id;
+
+    task_id = spice_file_transfer_task_get_id(xfer_task);
+    xfer_op = g_hash_table_lookup(channel->priv->file_xfer_tasks, GUINT_TO_POINTER(task_id));
+    g_return_if_fail(xfer_op != NULL);
+
+    if (xfer_op->progress_callback)
+        xfer_op->progress_callback(xfer_op->total_sent,
+                                   xfer_op->transfer_size,
+                                   xfer_op->progress_callback_data);
+}
+
+static void file_transfer_operation_end(FileTransferOperation *xfer_op)
+{
+    g_return_if_fail(xfer_op != NULL);
+    spice_debug("Freeing file-transfer-operation %p", xfer_op);
+
+    /* SpiceFileTransferTask itself is freed after it emits "finish" */
+    if (xfer_op->tasks != NULL)
+        g_list_free(xfer_op->tasks);
+
+    g_free(xfer_op);
+}
+
+static void file_transfer_operation_reset_all(SpiceMainChannel *channel)
+{
+    GList *list_op, *it_op;
+
+    list_op = g_hash_table_get_values(channel->priv->file_xfer_tasks);
+    for (it_op = list_op; it_op != NULL; it_op = it_op->next) {
+        FileTransferOperation *op = it_op->data;
+        GList *it;
+        for (it = op->tasks; it != NULL; it = it->next) {
+            SpiceFileTransferTask *xfer_task = SPICE_FILE_TRANSFER_TASK(it->data);
+            GError *error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED,
+                                        "Agent connection closed");
+            spice_file_transfer_task_completed(xfer_task, error);
+        }
+    }
+    g_list_free(list_op);
+}
+
+static SpiceFileTransferTask *file_transfer_operation_find_task_by_id(SpiceMainChannel *channel,
+                                                                      guint32 task_id)
+{
+    FileTransferOperation *op;
+    GList *it;
+
+    op = g_hash_table_lookup (channel->priv->file_xfer_tasks, GUINT_TO_POINTER(task_id));
+    g_return_val_if_fail (op != NULL, NULL);
+
+    for (it = op->tasks; it != NULL; it = it->next) {
+        SpiceFileTransferTask *xfer_task = SPICE_FILE_TRANSFER_TASK(it->data);
+        guint32 id = spice_file_transfer_task_get_id(xfer_task);
+        if (id == task_id)
+            return xfer_task;
+    }
+    return NULL;
+}
+
+static void file_transfer_operation_task_finished(SpiceMainChannel *channel, SpiceFileTransferTask *task)
+{
+    FileTransferOperation *xfer_op;
+    guint32 task_id;
+
+    task_id = spice_file_transfer_task_get_id(task);
+    xfer_op = g_hash_table_lookup(channel->priv->file_xfer_tasks, GUINT_TO_POINTER(task_id));
+    if (xfer_op == NULL) {
+        /* Likely the operation has ended before the remove-task was called. One
+         * situation that this can easily happen is if the agent is disconnected
+         * while there are pending files. */
+        g_object_unref(task);
+        return;
+    }
+    /* Remove and free SpiceFileTransferTask */
+    xfer_op->tasks = g_list_remove(xfer_op->tasks, task);
+    g_object_unref(task);
+
+    /* Keep file-xfer-tasks up to date. If no more elements, operation is over */
+    g_hash_table_remove(channel->priv->file_xfer_tasks, GUINT_TO_POINTER(task_id));
+
+    /* No more pending operations */
+    if (xfer_op->tasks == NULL)
+        file_transfer_operation_end(xfer_op);
+}
+
 static void task_finished(SpiceFileTransferTask *task,
                           GError *error,
                           gpointer data)
 {
     SpiceMainChannel *channel = SPICE_MAIN_CHANNEL(data);
-    guint32 task_id = spice_file_transfer_task_get_id(task);
 
     if (error) {
         VDAgentFileXferStatusMessage msg;
-        msg.id = task_id;
+        msg.id = spice_file_transfer_task_get_id(task);
         msg.result = error->code == G_IO_ERROR_CANCELLED ?
                 VD_AGENT_FILE_XFER_STATUS_CANCELLED : VD_AGENT_FILE_XFER_STATUS_ERROR;
         agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_STATUS,
                              &msg, sizeof(msg), NULL);
     }
 
-    g_hash_table_remove(channel->priv->file_xfer_tasks, GUINT_TO_POINTER(task_id));
+    file_transfer_operation_task_finished(channel, task);
 }
 
 /**
@@ -3145,7 +3238,8 @@ void spice_main_file_copy_async(SpiceMainChannel *channel,
                                 gpointer user_data)
 {
     SpiceMainChannelPrivate *c = channel->priv;
-    GList *tasks, *it;
+    GList *it;
+    FileTransferOperation *xfer_op;
 
     g_return_if_fail(channel != NULL);
     g_return_if_fail(SPICE_IS_MAIN_CHANNEL(channel));
@@ -3162,27 +3256,29 @@ void spice_main_file_copy_async(SpiceMainChannel *channel,
         return;
     }
 
-    tasks = spice_file_transfer_task_create_tasks(channel,
-                                                  sources,
-                                                  flags,
-                                                  cancellable,
-                                                  progress_callback,
-                                                  progress_callback_data,
-                                                  file_xfer_flush_callback,
-                                                  NULL,
-                                                  callback,
-                                                  user_data);
-    for (it = tasks; it != NULL; it = it->next) {
+    xfer_op = g_new0(FileTransferOperation, 1);
+    xfer_op->progress_callback = progress_callback;
+    xfer_op->progress_callback_data = progress_callback_data;
+    xfer_op->channel = channel;
+    xfer_op->tasks = spice_file_transfer_task_create_tasks(channel,
+                                                           sources,
+                                                           flags,
+                                                           cancellable,
+                                                           file_xfer_flush_callback,
+                                                           xfer_op,
+                                                           callback,
+                                                           user_data);
+    spice_debug("New file-transfer-operation %p", xfer_op);
+    for (it = xfer_op->tasks; it != NULL; it = it->next) {
         SpiceFileTransferTask *task = SPICE_FILE_TRANSFER_TASK(it->data);
         guint32 task_id = spice_file_transfer_task_get_id(task);
 
-        g_hash_table_insert(c->file_xfer_tasks, GUINT_TO_POINTER(task_id), task);
-        g_signal_connect(task, "file-info", G_CALLBACK(file_xfer_on_file_info), channel);
+        g_hash_table_insert(c->file_xfer_tasks, GUINT_TO_POINTER(task_id), xfer_op);
+        g_signal_connect(task, "file-info", G_CALLBACK(file_xfer_on_file_info), xfer_op);
         g_signal_connect(task, "finished", G_CALLBACK(task_finished), channel);
         g_signal_emit(channel, signals[SPICE_MAIN_NEW_FILE_TRANSFER], 0, task);
         spice_file_transfer_task_start_task(task);
     }
-    g_list_free(tasks);
 }
 
 /**
@@ -3251,26 +3347,6 @@ static void spice_file_transfer_task_flush_done(SpiceFileTransferTask *self, GEr
         }
     }
 
-    if (self->progress_callback) {
-        goffset read = 0;
-        goffset total = 0;
-        SpiceMainChannel *main_channel = self->channel;
-        GHashTableIter iter;
-        gpointer key, value;
-
-        /* since the progress_callback does not have a parameter to indicate
-         * which file the progress is associated with, report progress on all
-         * current transfers */
-        g_hash_table_iter_init(&iter, main_channel->priv->file_xfer_tasks);
-        while (g_hash_table_iter_next(&iter, &key, &value)) {
-            SpiceFileTransferTask *t = (SpiceFileTransferTask *)value;
-            read += t->read_bytes;
-            total += t->file_size;
-        }
-
-        self->progress_callback(read, total, self->progress_callback_data);
-    }
-
     /* Read more data */
     file_xfer_continue_read(self);
 }
@@ -3279,8 +3355,6 @@ static GList *spice_file_transfer_task_create_tasks(SpiceMainChannel *channel,
                                                     GFile **files,
                                                     GFileCopyFlags flags,
                                                     GCancellable *cancellable,
-                                                    GFileProgressCallback progress_callback,
-                                                    gpointer progress_callback_data,
                                                     SpiceFileTransferTaskFlushCb flush_callback,
                                                     gpointer flush_callback_data,
                                                     GAsyncReadyCallback callback,
@@ -3300,8 +3374,6 @@ static GList *spice_file_transfer_task_create_tasks(SpiceMainChannel *channel,
 
         task = spice_file_transfer_task_new(channel, files[i], task_cancellable);
         task->flags = flags;
-        task->progress_callback = progress_callback;
-        task->progress_callback_data = progress_callback_data;
         task->flush_callback = flush_callback;
         task->flush_callback_data = flush_callback_data;
         task->callback = callback;
-- 
2.5.5



More information about the Spice-devel mailing list