[Spice-devel] [spice-gtk v3 06/16] file-xfer: a FileTransferOperation per transfer call

Jonathon Jongsma jjongsma at redhat.com
Fri Jun 3 16:42:13 UTC 2016


On Mon, 2016-05-30 at 11:55 +0200, Victor Toso wrote:
> Each call to spice_main_file_copy_async will now create a
> FileTransferOperation which groups all SpiceFileTransferTasks of the
> copy operation and also the progress_callback passed from Application.
> 
> As pointed in the fix 113093dd00a1cf10f6d3c3589b7589a184cec081, the
> progress_callback should provide information of the whole transfer
> operation; For that reason, there is no need to keep progress_callback
> and progress_callback_data per SpiceFileTransferTask but per
> FileTransferOperation.
> 
> The file_xfer_tasks hash table now has FileTransferOperation instead
> of SpiceFileTransferTask. To improve handling this new operation, I've
> created the helpers:
> 
> * file_transfer_operation_send_progress
> * file_transfer_operation_end
> * file_transfer_operation_reset_all
> * file_transfer_operation_find_task_by_id
> * file_transfer_operation_task_finished
> 
> This change is related to split SpiceFileTransferTask from
> channel-main.
> ---
>  src/channel-main.c | 206 ++++++++++++++++++++++++++++++++++++--------------
> ---
>  1 file changed, 139 insertions(+), 67 deletions(-)
> 
> diff --git a/src/channel-main.c b/src/channel-main.c
> index 72dcf1f..f36326d 100644
> --- a/src/channel-main.c
> +++ b/src/channel-main.c
> @@ -66,8 +66,6 @@ static GList
> *spice_file_transfer_task_create_tasks(SpiceMainChannel *channel,
>                                                      GFile **files,
>                                                      GFileCopyFlags flags,
>                                                      GCancellable
> *cancellable,
> -                                                    GFileProgressCallback
> progress_callback,
> -                                                    gpointer
> progress_callback_data,
>                                                      SpiceFileTransferTaskFlus
> hCb flush_callback,
>                                                      gpointer
> flush_callback_data,
>                                                      GAsyncReadyCallback
> callback,
> @@ -103,8 +101,6 @@ struct _SpiceFileTransferTask
>      GFileInputStream               *file_stream;
>      GFileCopyFlags                 flags;
>      GCancellable                   *cancellable;
> -    GFileProgressCallback          progress_callback;
> -    gpointer                       progress_callback_data;
>      SpiceFileTransferTaskFlushCb   flush_callback;
>      gpointer                       flush_callback_data;
>      GAsyncReadyCallback            callback;
> @@ -156,6 +152,15 @@ typedef struct {
>      SpiceDisplayState       display_state;
>  } SpiceDisplayConfig;
>  
> +typedef struct {
> +    GList                      *tasks;
> +    SpiceMainChannel           *channel;
> +    GFileProgressCallback       progress_callback;
> +    gpointer                    progress_callback_data;
> +    goffset                     total_sent;
> +    goffset                     transfer_size;
> +} FileTransferOperation;
> +
>  struct _SpiceMainChannelPrivate  {
>      enum SpiceMouseMode         mouse_mode;
>      enum SpiceMouseMode         requested_mouse_mode;
> @@ -257,6 +262,13 @@ static void file_xfer_flushed(SpiceMainChannel *channel,
> gboolean success);
>  static void spice_main_set_max_clipboard(SpiceMainChannel *self, gint max);
>  static void set_agent_connected(SpiceMainChannel *channel, gboolean
> connected);
>  
> +static void file_transfer_operation_send_progress(SpiceMainChannel *channel,
> SpiceFileTransferTask *xfer_task);
> +static void file_transfer_operation_end(FileTransferOperation *xfer_op);
> +static void file_transfer_operation_reset_all(SpiceMainChannel *channel);
> +static SpiceFileTransferTask
> *file_transfer_operation_find_task_by_id(SpiceMainChannel *channel,
> +                                                                      guint32
> task_id);
> +static void file_transfer_operation_task_finished(SpiceMainChannel *channel,
> SpiceFileTransferTask *task);
> +
>  /* ------------------------------------------------------------------ */
>  
>  static const char *agent_msg_types[] = {
> @@ -315,8 +327,7 @@ static void spice_main_channel_init(SpiceMainChannel
> *channel)
>  
>      c = channel->priv = SPICE_MAIN_CHANNEL_GET_PRIVATE(channel);
>      c->agent_msg_queue = g_queue_new();
> -    c->file_xfer_tasks = g_hash_table_new_full(g_direct_hash, g_direct_equal,
> -                                               NULL, g_object_unref);
> +    c->file_xfer_tasks = g_hash_table_new(g_direct_hash, g_direct_equal);
>      c->flushing = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
>                                          g_object_unref);
>      c->cancellable_volume_info = g_cancellable_new();
> @@ -470,9 +481,6 @@ static void spice_channel_iterate_write(SpiceChannel
> *channel)
>  static void spice_main_channel_reset_agent(SpiceMainChannel *channel)
>  {
>      SpiceMainChannelPrivate *c = channel->priv;
> -    GError *error;
> -    GList *tasks;
> -    GList *l;
>  
>      c->agent_connected = FALSE;
>      c->agent_caps_received = FALSE;
> @@ -481,15 +489,7 @@ static void
> spice_main_channel_reset_agent(SpiceMainChannel *channel)
>      g_clear_pointer(&c->agent_msg_data, g_free);
>      c->agent_msg_size = 0;
>  
> -    tasks = g_hash_table_get_values(c->file_xfer_tasks);
> -    for (l = tasks; l != NULL; l = l->next) {
> -        SpiceFileTransferTask *task = (SpiceFileTransferTask *)l->data;
> -
> -        error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED,
> -                            "Agent connection closed");
> -        spice_file_transfer_task_completed(task, error);
> -    }
> -    g_list_free(tasks);
> +    file_transfer_operation_reset_all(channel);
>      file_xfer_flushed(channel, FALSE);
>  }
>  
> @@ -1878,7 +1878,9 @@ static void file_xfer_data_flushed_cb(GObject
> *source_object,
>      SpiceMainChannel *channel = (SpiceMainChannel *)source_object;
>      GError *error = NULL;
>  
> -    file_xfer_flush_finish(channel, res, &error);
> +    if (file_xfer_flush_finish(channel, res, &error))
> +        file_transfer_operation_send_progress(channel, self);
> +

Slight change of behavior here. Perviously it seems that we sent the progress
even if flush_finish was not successful. I don't know what the right behavior
is. I assume this was intentional?

>      spice_file_transfer_task_flush_done(self, error);
>  }
>  
> @@ -1905,7 +1907,9 @@ static void
> file_xfer_flush_callback(SpiceFileTransferTask *xfer_task,
>  {
>      SpiceMainChannel *main_channel;
>      GCancellable *cancellable;
> +    FileTransferOperation *xfer_op = user_data;
>  
> +    xfer_op->total_sent += count;
>      file_xfer_queue(xfer_task, buffer, count);
>      if (count == 0)
>          return;
> @@ -2146,7 +2150,7 @@ static void main_agent_handle_msg(SpiceChannel *channel,
>          SpiceFileTransferTask *task;
>          VDAgentFileXferStatusMessage *msg = payload;
>  
> -        task = g_hash_table_lookup(c->file_xfer_tasks, GUINT_TO_POINTER(msg-
> >id));
> +        task = file_transfer_operation_find_task_by_id(self, msg->id);
>          if (task != NULL) {
>              spice_file_transfer_task_handle_status(task, msg);
>          } else {
> @@ -3042,21 +3046,22 @@ static void
> file_xfer_on_file_info(SpiceFileTransferTask *xfer_task,
>                                     GFileInfo *info,
>                                     gpointer data)
>  {
> -    SpiceMainChannel *channel;
>      GKeyFile *keyfile;
>      VDAgentFileXferStartMessage msg;
>      gchar *string, *basename;
>      guint64 file_size;
>      gsize data_len;
>      GError *error = NULL;
> +    FileTransferOperation *xfer_op;
>  
>      g_return_if_fail(info != NULL);
> -
> -    channel = SPICE_MAIN_CHANNEL(data);
> +    xfer_op = data;
>  
>      basename = g_file_info_get_attribute_as_string(info,
> G_FILE_ATTRIBUTE_STANDARD_NAME);
>      file_size = g_file_info_get_attribute_uint64(info,
> G_FILE_ATTRIBUTE_STANDARD_SIZE);
>  
> +    xfer_op->transfer_size += file_size;
> +
>      keyfile = g_key_file_new();
>      g_key_file_set_string(keyfile, "vdagent-file-xfer", "name", basename);
>      g_key_file_set_uint64(keyfile, "vdagent-file-xfer", "size", file_size);
> @@ -3072,34 +3077,122 @@ static void
> file_xfer_on_file_info(SpiceFileTransferTask *xfer_task,
>  
>      /* Create file-xfer start message */
>      msg.id = spice_file_transfer_task_get_id(xfer_task);
> -    agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_START,
> +    agent_msg_queue_many(xfer_op->channel, VD_AGENT_FILE_XFER_START,
>                           &msg, sizeof(msg),
>                           string, data_len + 1, NULL);
>      g_free(string);
> -    spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE);
> +    spice_channel_wakeup(SPICE_CHANNEL(xfer_op->channel), FALSE);
>      return;
>  
>  failed:
>      spice_file_transfer_task_completed(xfer_task, error);
>  }
>  
> +static void file_transfer_operation_send_progress(SpiceMainChannel *channel,
> SpiceFileTransferTask *xfer_task)
> +{
> +    FileTransferOperation *xfer_op;
> +    guint32 task_id;
> +
> +    task_id = spice_file_transfer_task_get_id(xfer_task);
> +    xfer_op = g_hash_table_lookup(channel->priv->file_xfer_tasks,
> GUINT_TO_POINTER(task_id));
> +    g_return_if_fail(xfer_op != NULL);
> +
> +    if (xfer_op->progress_callback)
> +        xfer_op->progress_callback(xfer_op->total_sent,
> +                                   xfer_op->transfer_size,
> +                                   xfer_op->progress_callback_data);
> +}
> +
> +static void file_transfer_operation_end(FileTransferOperation *xfer_op)
> +{
> +    g_return_if_fail(xfer_op != NULL);
> +    spice_debug("Freeing file-transfer-operation %p", xfer_op);
> +
> +    /* SpiceFileTransferTask itself is freed after it emits "finish" */
> +    if (xfer_op->tasks != NULL)
> +        g_list_free(xfer_op->tasks);

Is there any situation where we would expect tasks to be non-NULL? Should we at
least print a warning for this case?

> +
> +    g_free(xfer_op);
> +}
> +
> +static void file_transfer_operation_reset_all(SpiceMainChannel *channel)
> +{
> +    GList *list_op, *it_op;
> +
> +    list_op = g_hash_table_get_values(channel->priv->file_xfer_tasks);
> +    for (it_op = list_op; it_op != NULL; it_op = it_op->next) {
> +        FileTransferOperation *op = it_op->data;
> +        GList *it;
> +        for (it = op->tasks; it != NULL; it = it->next) {
> +            SpiceFileTransferTask *xfer_task = SPICE_FILE_TRANSFER_TASK(it-
> >data);
> +            GError *error = g_error_new(SPICE_CLIENT_ERROR,
> SPICE_CLIENT_ERROR_FAILED,
> +                                        "Agent connection closed");
> +            spice_file_transfer_task_completed(xfer_task, error);
> +        }

Since the operation is present in the hash table once for each task that it
contains, it seems that above you will be completing the same task multiple
times for these multi-task operations.

For example, if Operation A has 4 tasks, g_hash_table_get_values() will return a
list of 4 items, all containing operation A. So I think you'll call
spice_file_transfer_task_completed() for each of the tasks in operation A four
times. 

> +    }
> +    g_list_free(list_op);
> +}
> +
> +static SpiceFileTransferTask
> *file_transfer_operation_find_task_by_id(SpiceMainChannel *channel,
> +                                                                      guint32
> task_id)
> +{
> +    FileTransferOperation *op;
> +    GList *it;
> +
> +    op = g_hash_table_lookup (channel->priv->file_xfer_tasks,
> GUINT_TO_POINTER(task_id));
> +    g_return_val_if_fail (op != NULL, NULL);
> +
> +    for (it = op->tasks; it != NULL; it = it->next) {
> +        SpiceFileTransferTask *xfer_task = SPICE_FILE_TRANSFER_TASK(it-
> >data);
> +        guint32 id = spice_file_transfer_task_get_id(xfer_task);
> +        if (id == task_id)
> +            return xfer_task;
> +    }
> +    return NULL;
> +}

This feels a bit convoluted to me. There is a hash table that relates a task ID
to a FileTransferOperation. The FileTransferOperation has a list of tasks, and
each task has an ID. And since the Operations are inserted into the hash table
once for each task they contain, you may iterate over the same operation
multiple times before you get to the operation that contains your task. For
example, consider where you have two operations: OperationA with 10 tasks,
Operation B with only 1. And say you're looking for the task in operation B. You
could end up iterating through the ten tasks of operation A ten times (100 total
iterations) before you get to the task you want. Granted, this isn't a
performance-critical path, but considering the other issues mentioned above, I
feel there's probably a better design.


> +
> +static void file_transfer_operation_task_finished(SpiceMainChannel *channel,
> SpiceFileTransferTask *task)
> +{
> +    FileTransferOperation *xfer_op;
> +    guint32 task_id;
> +
> +    task_id = spice_file_transfer_task_get_id(task);
> +    xfer_op = g_hash_table_lookup(channel->priv->file_xfer_tasks,
> GUINT_TO_POINTER(task_id));
> +    if (xfer_op == NULL) {
> +        /* Likely the operation has ended before the remove-task was called.
> One
> +         * situation that this can easily happen is if the agent is
> disconnected
> +         * while there are pending files. */
> +        g_object_unref(task);
> +        return;
> +    }
> +    /* Remove and free SpiceFileTransferTask */
> +    xfer_op->tasks = g_list_remove(xfer_op->tasks, task);
> +    g_object_unref(task);
> +
> +    /* Keep file-xfer-tasks up to date. If no more elements, operation is
> over */
> +    g_hash_table_remove(channel->priv->file_xfer_tasks,
> GUINT_TO_POINTER(task_id));
> +
> +    /* No more pending operations */
> +    if (xfer_op->tasks == NULL)
> +        file_transfer_operation_end(xfer_op);
> +}
> +
>  static void task_finished(SpiceFileTransferTask *task,
>                            GError *error,
>                            gpointer data)
>  {
>      SpiceMainChannel *channel = SPICE_MAIN_CHANNEL(data);
> -    guint32 task_id = spice_file_transfer_task_get_id(task);
>  
>      if (error) {
>          VDAgentFileXferStatusMessage msg;
> -        msg.id = task_id;
> +        msg.id = spice_file_transfer_task_get_id(task);
>          msg.result = error->code == G_IO_ERROR_CANCELLED ?
>                  VD_AGENT_FILE_XFER_STATUS_CANCELLED :
> VD_AGENT_FILE_XFER_STATUS_ERROR;
>          agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_STATUS,
>                               &msg, sizeof(msg), NULL);
>      }
>  
> -    g_hash_table_remove(channel->priv->file_xfer_tasks,
> GUINT_TO_POINTER(task_id));
> +    file_transfer_operation_task_finished(channel, task);
>  }
>  
>  /**
> @@ -3145,7 +3238,8 @@ void spice_main_file_copy_async(SpiceMainChannel
> *channel,
>                                  gpointer user_data)
>  {
>      SpiceMainChannelPrivate *c = channel->priv;
> -    GList *tasks, *it;
> +    GList *it;
> +    FileTransferOperation *xfer_op;
>  
>      g_return_if_fail(channel != NULL);
>      g_return_if_fail(SPICE_IS_MAIN_CHANNEL(channel));
> @@ -3162,27 +3256,29 @@ void spice_main_file_copy_async(SpiceMainChannel
> *channel,
>          return;
>      }
>  
> -    tasks = spice_file_transfer_task_create_tasks(channel,
> -                                                  sources,
> -                                                  flags,
> -                                                  cancellable,
> -                                                  progress_callback,
> -                                                  progress_callback_data,
> -                                                  file_xfer_flush_callback,
> -                                                  NULL,
> -                                                  callback,
> -                                                  user_data);
> -    for (it = tasks; it != NULL; it = it->next) {
> +    xfer_op = g_new0(FileTransferOperation, 1);
> +    xfer_op->progress_callback = progress_callback;
> +    xfer_op->progress_callback_data = progress_callback_data;
> +    xfer_op->channel = channel;
> +    xfer_op->tasks = spice_file_transfer_task_create_tasks(channel,
> +                                                           sources,
> +                                                           flags,
> +                                                           cancellable,
> +                                                           file_xfer_flush_ca
> llback,
> +                                                           xfer_op,
> +                                                           callback,
> +                                                           user_data);

What about changing 

GList *spice_file_transfer_task_create_tasks()

to

FileTransferOperation spice_file_transfer_operation_create()?


> +    spice_debug("New file-transfer-operation %p", xfer_op);
> +    for (it = xfer_op->tasks; it != NULL; it = it->next) {
>          SpiceFileTransferTask *task = SPICE_FILE_TRANSFER_TASK(it->data);
>          guint32 task_id = spice_file_transfer_task_get_id(task);
>  
> -        g_hash_table_insert(c->file_xfer_tasks, GUINT_TO_POINTER(task_id),
> task);
> -        g_signal_connect(task, "file-info",
> G_CALLBACK(file_xfer_on_file_info), channel);
> +        g_hash_table_insert(c->file_xfer_tasks, GUINT_TO_POINTER(task_id),
> xfer_op);
> +        g_signal_connect(task, "file-info",
> G_CALLBACK(file_xfer_on_file_info), xfer_op);
>          g_signal_connect(task, "finished", G_CALLBACK(task_finished),
> channel);
>          g_signal_emit(channel, signals[SPICE_MAIN_NEW_FILE_TRANSFER], 0,
> task);
>          spice_file_transfer_task_start_task(task);
>      }
> -    g_list_free(tasks);
>  }
>  
>  /**
> @@ -3251,26 +3347,6 @@ static void
> spice_file_transfer_task_flush_done(SpiceFileTransferTask *self, GEr
>          }
>      }
>  
> -    if (self->progress_callback) {
> -        goffset read = 0;
> -        goffset total = 0;
> -        SpiceMainChannel *main_channel = self->channel;
> -        GHashTableIter iter;
> -        gpointer key, value;
> -
> -        /* since the progress_callback does not have a parameter to indicate
> -         * which file the progress is associated with, report progress on all
> -         * current transfers */
> -        g_hash_table_iter_init(&iter, main_channel->priv->file_xfer_tasks);
> -        while (g_hash_table_iter_next(&iter, &key, &value)) {
> -            SpiceFileTransferTask *t = (SpiceFileTransferTask *)value;
> -            read += t->read_bytes;
> -            total += t->file_size;
> -        }
> -
> -        self->progress_callback(read, total, self->progress_callback_data);
> -    }
> -
>      /* Read more data */
>      file_xfer_continue_read(self);
>  }
> @@ -3279,8 +3355,6 @@ static GList
> *spice_file_transfer_task_create_tasks(SpiceMainChannel *channel,
>                                                      GFile **files,
>                                                      GFileCopyFlags flags,
>                                                      GCancellable
> *cancellable,
> -                                                    GFileProgressCallback
> progress_callback,
> -                                                    gpointer
> progress_callback_data,
>                                                      SpiceFileTransferTaskFlus
> hCb flush_callback,
>                                                      gpointer
> flush_callback_data,
>                                                      GAsyncReadyCallback
> callback,
> @@ -3300,8 +3374,6 @@ static GList
> *spice_file_transfer_task_create_tasks(SpiceMainChannel *channel,
>  
>          task = spice_file_transfer_task_new(channel, files[i],
> task_cancellable);
>          task->flags = flags;
> -        task->progress_callback = progress_callback;
> -        task->progress_callback_data = progress_callback_data;
>          task->flush_callback = flush_callback;
>          task->flush_callback_data = flush_callback_data;
>          task->callback = callback;


Perhaps it would be simpler to simply keep the hash table as a map between task
ID and SpiceFileTransferTask, but add a field to SpiceFileTransferTask pointing
to its operation?

Reviewed-by: Jonathon Jongsma <jjongsma at redhat.com>



More information about the Spice-devel mailing list