[Spice-devel] [spice-gtk v3 06/16] file-xfer: a FileTransferOperation per transfer call

Victor Toso victortoso at redhat.com
Sun Jun 5 09:31:19 UTC 2016


On Fri, Jun 03, 2016 at 11:42:13AM -0500, Jonathon Jongsma wrote:
> On Mon, 2016-05-30 at 11:55 +0200, Victor Toso wrote:
> > Each call to spice_main_file_copy_async will now create a
> > FileTransferOperation which groups all SpiceFileTransferTasks of the
> > copy operation and also the progress_callback passed from Application.
> > 
> > As pointed in the fix 113093dd00a1cf10f6d3c3589b7589a184cec081, the
> > progress_callback should provide information of the whole transfer
> > operation; For that reason, there is no need to keep progress_callback
> > and progress_callback_data per SpiceFileTransferTask but per
> > FileTransferOperation.
> > 
> > The file_xfer_tasks hash table now has FileTransferOperation instead
> > of SpiceFileTransferTask. To improve handling this new operation, I've
> > created the helpers:
> > 
> > * file_transfer_operation_send_progress
> > * file_transfer_operation_end
> > * file_transfer_operation_reset_all
> > * file_transfer_operation_find_task_by_id
> > * file_transfer_operation_task_finished
> > 
> > This change is related to split SpiceFileTransferTask from
> > channel-main.
> > ---
> >  src/channel-main.c | 206 ++++++++++++++++++++++++++++++++++++--------------
> > ---
> >  1 file changed, 139 insertions(+), 67 deletions(-)
> > 
> > diff --git a/src/channel-main.c b/src/channel-main.c
> > index 72dcf1f..f36326d 100644
> > --- a/src/channel-main.c
> > +++ b/src/channel-main.c
> > @@ -66,8 +66,6 @@ static GList
> > *spice_file_transfer_task_create_tasks(SpiceMainChannel *channel,
> >                                                      GFile **files,
> >                                                      GFileCopyFlags flags,
> >                                                      GCancellable
> > *cancellable,
> > -                                                    GFileProgressCallback
> > progress_callback,
> > -                                                    gpointer
> > progress_callback_data,
> >                                                      SpiceFileTransferTaskFlus
> > hCb flush_callback,
> >                                                      gpointer
> > flush_callback_data,
> >                                                      GAsyncReadyCallback
> > callback,
> > @@ -103,8 +101,6 @@ struct _SpiceFileTransferTask
> >      GFileInputStream               *file_stream;
> >      GFileCopyFlags                 flags;
> >      GCancellable                   *cancellable;
> > -    GFileProgressCallback          progress_callback;
> > -    gpointer                       progress_callback_data;
> >      SpiceFileTransferTaskFlushCb   flush_callback;
> >      gpointer                       flush_callback_data;
> >      GAsyncReadyCallback            callback;
> > @@ -156,6 +152,15 @@ typedef struct {
> >      SpiceDisplayState       display_state;
> >  } SpiceDisplayConfig;
> >  
> > +typedef struct {
> > +    GList                      *tasks;
> > +    SpiceMainChannel           *channel;
> > +    GFileProgressCallback       progress_callback;
> > +    gpointer                    progress_callback_data;
> > +    goffset                     total_sent;
> > +    goffset                     transfer_size;
> > +} FileTransferOperation;
> > +
> >  struct _SpiceMainChannelPrivate  {
> >      enum SpiceMouseMode         mouse_mode;
> >      enum SpiceMouseMode         requested_mouse_mode;
> > @@ -257,6 +262,13 @@ static void file_xfer_flushed(SpiceMainChannel *channel,
> > gboolean success);
> >  static void spice_main_set_max_clipboard(SpiceMainChannel *self, gint max);
> >  static void set_agent_connected(SpiceMainChannel *channel, gboolean
> > connected);
> >  
> > +static void file_transfer_operation_send_progress(SpiceMainChannel *channel,
> > SpiceFileTransferTask *xfer_task);
> > +static void file_transfer_operation_end(FileTransferOperation *xfer_op);
> > +static void file_transfer_operation_reset_all(SpiceMainChannel *channel);
> > +static SpiceFileTransferTask
> > *file_transfer_operation_find_task_by_id(SpiceMainChannel *channel,
> > +                                                                      guint32
> > task_id);
> > +static void file_transfer_operation_task_finished(SpiceMainChannel *channel,
> > SpiceFileTransferTask *task);
> > +
> >  /* ------------------------------------------------------------------ */
> >  
> >  static const char *agent_msg_types[] = {
> > @@ -315,8 +327,7 @@ static void spice_main_channel_init(SpiceMainChannel
> > *channel)
> >  
> >      c = channel->priv = SPICE_MAIN_CHANNEL_GET_PRIVATE(channel);
> >      c->agent_msg_queue = g_queue_new();
> > -    c->file_xfer_tasks = g_hash_table_new_full(g_direct_hash, g_direct_equal,
> > -                                               NULL, g_object_unref);
> > +    c->file_xfer_tasks = g_hash_table_new(g_direct_hash, g_direct_equal);
> >      c->flushing = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
> >                                          g_object_unref);
> >      c->cancellable_volume_info = g_cancellable_new();
> > @@ -470,9 +481,6 @@ static void spice_channel_iterate_write(SpiceChannel
> > *channel)
> >  static void spice_main_channel_reset_agent(SpiceMainChannel *channel)
> >  {
> >      SpiceMainChannelPrivate *c = channel->priv;
> > -    GError *error;
> > -    GList *tasks;
> > -    GList *l;
> >  
> >      c->agent_connected = FALSE;
> >      c->agent_caps_received = FALSE;
> > @@ -481,15 +489,7 @@ static void
> > spice_main_channel_reset_agent(SpiceMainChannel *channel)
> >      g_clear_pointer(&c->agent_msg_data, g_free);
> >      c->agent_msg_size = 0;
> >  
> > -    tasks = g_hash_table_get_values(c->file_xfer_tasks);
> > -    for (l = tasks; l != NULL; l = l->next) {
> > -        SpiceFileTransferTask *task = (SpiceFileTransferTask *)l->data;
> > -
> > -        error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED,
> > -                            "Agent connection closed");
> > -        spice_file_transfer_task_completed(task, error);
> > -    }
> > -    g_list_free(tasks);
> > +    file_transfer_operation_reset_all(channel);
> >      file_xfer_flushed(channel, FALSE);
> >  }
> >  
> > @@ -1878,7 +1878,9 @@ static void file_xfer_data_flushed_cb(GObject
> > *source_object,
> >      SpiceMainChannel *channel = (SpiceMainChannel *)source_object;
> >      GError *error = NULL;
> >  
> > -    file_xfer_flush_finish(channel, res, &error);
> > +    if (file_xfer_flush_finish(channel, res, &error))
> > +        file_transfer_operation_send_progress(channel, self);
> > +
>
> Slight change of behavior here. Perviously it seems that we sent the progress
> even if flush_finish was not successful. I don't know what the right behavior
> is. I assume this was intentional?

It was but I don't recall exactly the issue that I had. Overall, I think
it makes sense to send progress only in case some progress were made
instead of failures.

>
> >      spice_file_transfer_task_flush_done(self, error);
> >  }
> >  
> > @@ -1905,7 +1907,9 @@ static void
> > file_xfer_flush_callback(SpiceFileTransferTask *xfer_task,
> >  {
> >      SpiceMainChannel *main_channel;
> >      GCancellable *cancellable;
> > +    FileTransferOperation *xfer_op = user_data;
> >  
> > +    xfer_op->total_sent += count;
> >      file_xfer_queue(xfer_task, buffer, count);
> >      if (count == 0)
> >          return;
> > @@ -2146,7 +2150,7 @@ static void main_agent_handle_msg(SpiceChannel *channel,
> >          SpiceFileTransferTask *task;
> >          VDAgentFileXferStatusMessage *msg = payload;
> >  
> > -        task = g_hash_table_lookup(c->file_xfer_tasks, GUINT_TO_POINTER(msg-
> > >id));
> > +        task = file_transfer_operation_find_task_by_id(self, msg->id);
> >          if (task != NULL) {
> >              spice_file_transfer_task_handle_status(task, msg);
> >          } else {
> > @@ -3042,21 +3046,22 @@ static void
> > file_xfer_on_file_info(SpiceFileTransferTask *xfer_task,
> >                                     GFileInfo *info,
> >                                     gpointer data)
> >  {
> > -    SpiceMainChannel *channel;
> >      GKeyFile *keyfile;
> >      VDAgentFileXferStartMessage msg;
> >      gchar *string, *basename;
> >      guint64 file_size;
> >      gsize data_len;
> >      GError *error = NULL;
> > +    FileTransferOperation *xfer_op;
> >  
> >      g_return_if_fail(info != NULL);
> > -
> > -    channel = SPICE_MAIN_CHANNEL(data);
> > +    xfer_op = data;
> >  
> >      basename = g_file_info_get_attribute_as_string(info,
> > G_FILE_ATTRIBUTE_STANDARD_NAME);
> >      file_size = g_file_info_get_attribute_uint64(info,
> > G_FILE_ATTRIBUTE_STANDARD_SIZE);
> >  
> > +    xfer_op->transfer_size += file_size;
> > +
> >      keyfile = g_key_file_new();
> >      g_key_file_set_string(keyfile, "vdagent-file-xfer", "name", basename);
> >      g_key_file_set_uint64(keyfile, "vdagent-file-xfer", "size", file_size);
> > @@ -3072,34 +3077,122 @@ static void
> > file_xfer_on_file_info(SpiceFileTransferTask *xfer_task,
> >  
> >      /* Create file-xfer start message */
> >      msg.id = spice_file_transfer_task_get_id(xfer_task);
> > -    agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_START,
> > +    agent_msg_queue_many(xfer_op->channel, VD_AGENT_FILE_XFER_START,
> >                           &msg, sizeof(msg),
> >                           string, data_len + 1, NULL);
> >      g_free(string);
> > -    spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE);
> > +    spice_channel_wakeup(SPICE_CHANNEL(xfer_op->channel), FALSE);
> >      return;
> >  
> >  failed:
> >      spice_file_transfer_task_completed(xfer_task, error);
> >  }
> >  
> > +static void file_transfer_operation_send_progress(SpiceMainChannel *channel,
> > SpiceFileTransferTask *xfer_task)
> > +{
> > +    FileTransferOperation *xfer_op;
> > +    guint32 task_id;
> > +
> > +    task_id = spice_file_transfer_task_get_id(xfer_task);
> > +    xfer_op = g_hash_table_lookup(channel->priv->file_xfer_tasks,
> > GUINT_TO_POINTER(task_id));
> > +    g_return_if_fail(xfer_op != NULL);
> > +
> > +    if (xfer_op->progress_callback)
> > +        xfer_op->progress_callback(xfer_op->total_sent,
> > +                                   xfer_op->transfer_size,
> > +                                   xfer_op->progress_callback_data);
> > +}
> > +
> > +static void file_transfer_operation_end(FileTransferOperation *xfer_op)
> > +{
> > +    g_return_if_fail(xfer_op != NULL);
> > +    spice_debug("Freeing file-transfer-operation %p", xfer_op);
> > +
> > +    /* SpiceFileTransferTask itself is freed after it emits "finish" */
> > +    if (xfer_op->tasks != NULL)
> > +        g_list_free(xfer_op->tasks);
>
> Is there any situation where we would expect tasks to be non-NULL?
> Should we at least print a warning for this case?

Agreed as it would leak the list pointer, I'll change it.

>
> > +
> > +    g_free(xfer_op);
> > +}
> > +
> > +static void file_transfer_operation_reset_all(SpiceMainChannel *channel)
> > +{
> > +    GList *list_op, *it_op;
> > +
> > +    list_op = g_hash_table_get_values(channel->priv->file_xfer_tasks);
> > +    for (it_op = list_op; it_op != NULL; it_op = it_op->next) {
> > +        FileTransferOperation *op = it_op->data;
> > +        GList *it;
> > +        for (it = op->tasks; it != NULL; it = it->next) {
> > +            SpiceFileTransferTask *xfer_task = SPICE_FILE_TRANSFER_TASK(it-
> > >data);
> > +            GError *error = g_error_new(SPICE_CLIENT_ERROR,
> > SPICE_CLIENT_ERROR_FAILED,
> > +                                        "Agent connection closed");
> > +            spice_file_transfer_task_completed(xfer_task, error);
> > +        }
>
> Since the operation is present in the hash table once for each task that it
> contains, it seems that above you will be completing the same task multiple
> times for these multi-task operations.
>
> For example, if Operation A has 4 tasks, g_hash_table_get_values() will return a
> list of 4 items, all containing operation A. So I think you'll call
> spice_file_transfer_task_completed() for each of the tasks in operation A four
> times. 

That's true.. I got myself confused a few times already here due the
list_op provided by g_hash_table_get_values on file_xfer_tasks. I think
we can improve that based in the comment below.

>
> > +    }
> > +    g_list_free(list_op);
> > +}
> > +
> > +static SpiceFileTransferTask
> > *file_transfer_operation_find_task_by_id(SpiceMainChannel *channel,
> > +                                                                      guint32
> > task_id)
> > +{
> > +    FileTransferOperation *op;
> > +    GList *it;
> > +
> > +    op = g_hash_table_lookup (channel->priv->file_xfer_tasks,
> > GUINT_TO_POINTER(task_id));
> > +    g_return_val_if_fail (op != NULL, NULL);
> > +
> > +    for (it = op->tasks; it != NULL; it = it->next) {
> > +        SpiceFileTransferTask *xfer_task = SPICE_FILE_TRANSFER_TASK(it-
> > >data);
> > +        guint32 id = spice_file_transfer_task_get_id(xfer_task);
> > +        if (id == task_id)
> > +            return xfer_task;
> > +    }
> > +    return NULL;
> > +}
>
> This feels a bit convoluted to me. There is a hash table that relates
> a task ID to a FileTransferOperation. The FileTransferOperation has a
> list of tasks, and each task has an ID. And since the Operations are
> inserted into the hash table once for each task they contain, you may
> iterate over the same operation multiple times before you get to the
> operation that contains your task. For example, consider where you
> have two operations: OperationA with 10 tasks, Operation B with only
> 1. And say you're looking for the task in operation B. You could end
> up iterating through the ten tasks of operation A ten times (100 total
> iterations) before you get to the task you want.

Unless I'm missing something, I don't think this is correct. On the code
above, we get the correct FileTransferOperation on g_hash_table_lookup
(so, in your example, that would be operation B) and the for below would
interact only once to find the correct SpiceFileTransferTask;

If the example was about how much we interact in a big
FileTransferOperation, for instance, with 100 items, that would be bad
indeed and in this case we might consider moving away from GList to
GHashTable in the FileTransferOperation. If we agree with this, I would
change the _create_tasks() function to return a GHashTable instead of
GList.

> Granted, this isn't a performance-critical path, but considering the
> other issues mentioned above, I feel there's probably a better design.

Indeed, it could be better, what do you think about the following:

* channel->priv->file_xfer_tasks: (GHashTable) that provides mapping
  from task_id to FileTransferOperation ~ useful to get the correct
  xfer_op based on task_id which is what we receive from Agent

* xfer_op->tasks: (GHashTable) that provides mapping from task_id to the
  actual FileTransferTask pointer ~ This should makes us not worry about
  performance in large tasks sets.

>
> > +
> > +static void file_transfer_operation_task_finished(SpiceMainChannel *channel,
> > SpiceFileTransferTask *task)
> > +{
> > +    FileTransferOperation *xfer_op;
> > +    guint32 task_id;
> > +
> > +    task_id = spice_file_transfer_task_get_id(task);
> > +    xfer_op = g_hash_table_lookup(channel->priv->file_xfer_tasks,
> > GUINT_TO_POINTER(task_id));
> > +    if (xfer_op == NULL) {
> > +        /* Likely the operation has ended before the remove-task was called.
> > One
> > +         * situation that this can easily happen is if the agent is
> > disconnected
> > +         * while there are pending files. */
> > +        g_object_unref(task);
> > +        return;
> > +    }
> > +    /* Remove and free SpiceFileTransferTask */
> > +    xfer_op->tasks = g_list_remove(xfer_op->tasks, task);
> > +    g_object_unref(task);
> > +
> > +    /* Keep file-xfer-tasks up to date. If no more elements, operation is
> > over */
> > +    g_hash_table_remove(channel->priv->file_xfer_tasks,
> > GUINT_TO_POINTER(task_id));
> > +
> > +    /* No more pending operations */
> > +    if (xfer_op->tasks == NULL)
> > +        file_transfer_operation_end(xfer_op);
> > +}
> > +
> >  static void task_finished(SpiceFileTransferTask *task,
> >                            GError *error,
> >                            gpointer data)
> >  {
> >      SpiceMainChannel *channel = SPICE_MAIN_CHANNEL(data);
> > -    guint32 task_id = spice_file_transfer_task_get_id(task);
> >  
> >      if (error) {
> >          VDAgentFileXferStatusMessage msg;
> > -        msg.id = task_id;
> > +        msg.id = spice_file_transfer_task_get_id(task);
> >          msg.result = error->code == G_IO_ERROR_CANCELLED ?
> >                  VD_AGENT_FILE_XFER_STATUS_CANCELLED :
> > VD_AGENT_FILE_XFER_STATUS_ERROR;
> >          agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_STATUS,
> >                               &msg, sizeof(msg), NULL);
> >      }
> >  
> > -    g_hash_table_remove(channel->priv->file_xfer_tasks,
> > GUINT_TO_POINTER(task_id));
> > +    file_transfer_operation_task_finished(channel, task);
> >  }
> >  
> >  /**
> > @@ -3145,7 +3238,8 @@ void spice_main_file_copy_async(SpiceMainChannel
> > *channel,
> >                                  gpointer user_data)
> >  {
> >      SpiceMainChannelPrivate *c = channel->priv;
> > -    GList *tasks, *it;
> > +    GList *it;
> > +    FileTransferOperation *xfer_op;
> >  
> >      g_return_if_fail(channel != NULL);
> >      g_return_if_fail(SPICE_IS_MAIN_CHANNEL(channel));
> > @@ -3162,27 +3256,29 @@ void spice_main_file_copy_async(SpiceMainChannel
> > *channel,
> >          return;
> >      }
> >  
> > -    tasks = spice_file_transfer_task_create_tasks(channel,
> > -                                                  sources,
> > -                                                  flags,
> > -                                                  cancellable,
> > -                                                  progress_callback,
> > -                                                  progress_callback_data,
> > -                                                  file_xfer_flush_callback,
> > -                                                  NULL,
> > -                                                  callback,
> > -                                                  user_data);
> > -    for (it = tasks; it != NULL; it = it->next) {
> > +    xfer_op = g_new0(FileTransferOperation, 1);
> > +    xfer_op->progress_callback = progress_callback;
> > +    xfer_op->progress_callback_data = progress_callback_data;
> > +    xfer_op->channel = channel;
> > +    xfer_op->tasks = spice_file_transfer_task_create_tasks(channel,
> > +                                                           sources,
> > +                                                           flags,
> > +                                                           cancellable,
> > +                                                           file_xfer_flush_ca
> > llback,
> > +                                                           xfer_op,
> > +                                                           callback,
> > +                                                           user_data);
>
> What about changing 
>
> GList *spice_file_transfer_task_create_tasks()
>
> to
>
> FileTransferOperation spice_file_transfer_operation_create()?

I would agree to change the return value to GHashTable but not to
FileTransferOperation structure as this is the wrapper from
spice_main_file_copy_async data so it makes sense to me to keep this
struct internal to channel-main.

I don't mind about changing the function name but maybe
spice_file_transfer_task_operation_create() instead or just change to
singular the previous one, spice_file_transfer_task_create_task as I
often associate plural to lists but not to hashtables.

>
>
> > +    spice_debug("New file-transfer-operation %p", xfer_op);
> > +    for (it = xfer_op->tasks; it != NULL; it = it->next) {
> >          SpiceFileTransferTask *task = SPICE_FILE_TRANSFER_TASK(it->data);
> >          guint32 task_id = spice_file_transfer_task_get_id(task);
> >  
> > -        g_hash_table_insert(c->file_xfer_tasks, GUINT_TO_POINTER(task_id),
> > task);
> > -        g_signal_connect(task, "file-info",
> > G_CALLBACK(file_xfer_on_file_info), channel);
> > +        g_hash_table_insert(c->file_xfer_tasks, GUINT_TO_POINTER(task_id),
> > xfer_op);
> > +        g_signal_connect(task, "file-info",
> > G_CALLBACK(file_xfer_on_file_info), xfer_op);
> >          g_signal_connect(task, "finished", G_CALLBACK(task_finished),
> > channel);
> >          g_signal_emit(channel, signals[SPICE_MAIN_NEW_FILE_TRANSFER], 0,
> > task);
> >          spice_file_transfer_task_start_task(task);
> >      }
> > -    g_list_free(tasks);
> >  }
> >  
> >  /**
> > @@ -3251,26 +3347,6 @@ static void
> > spice_file_transfer_task_flush_done(SpiceFileTransferTask *self, GEr
> >          }
> >      }
> >  
> > -    if (self->progress_callback) {
> > -        goffset read = 0;
> > -        goffset total = 0;
> > -        SpiceMainChannel *main_channel = self->channel;
> > -        GHashTableIter iter;
> > -        gpointer key, value;
> > -
> > -        /* since the progress_callback does not have a parameter to indicate
> > -         * which file the progress is associated with, report progress on all
> > -         * current transfers */
> > -        g_hash_table_iter_init(&iter, main_channel->priv->file_xfer_tasks);
> > -        while (g_hash_table_iter_next(&iter, &key, &value)) {
> > -            SpiceFileTransferTask *t = (SpiceFileTransferTask *)value;
> > -            read += t->read_bytes;
> > -            total += t->file_size;
> > -        }
> > -
> > -        self->progress_callback(read, total, self->progress_callback_data);
> > -    }
> > -
> >      /* Read more data */
> >      file_xfer_continue_read(self);
> >  }
> > @@ -3279,8 +3355,6 @@ static GList
> > *spice_file_transfer_task_create_tasks(SpiceMainChannel *channel,
> >                                                      GFile **files,
> >                                                      GFileCopyFlags flags,
> >                                                      GCancellable
> > *cancellable,
> > -                                                    GFileProgressCallback
> > progress_callback,
> > -                                                    gpointer
> > progress_callback_data,
> >                                                      SpiceFileTransferTaskFlus
> > hCb flush_callback,
> >                                                      gpointer
> > flush_callback_data,
> >                                                      GAsyncReadyCallback
> > callback,
> > @@ -3300,8 +3374,6 @@ static GList
> > *spice_file_transfer_task_create_tasks(SpiceMainChannel *channel,
> >  
> >          task = spice_file_transfer_task_new(channel, files[i],
> > task_cancellable);
> >          task->flags = flags;
> > -        task->progress_callback = progress_callback;
> > -        task->progress_callback_data = progress_callback_data;
> >          task->flush_callback = flush_callback;
> >          task->flush_callback_data = flush_callback_data;
> >          task->callback = callback;
>
>
> Perhaps it would be simpler to simply keep the hash table as a map between task
> ID and SpiceFileTransferTask, but add a field to SpiceFileTransferTask pointing
> to its operation?

That would do as well! (sorry, I should have read the whole email before
starting to reply).

So, with my proposal, we would do:
1a) from task_id gets FileTransferOperation (xfer_op) from
    channel->priv->file_xfer_tasks
2a) from task_id gets SpiceFileTransferTask (xfer_task) from xfer_op's
    GHashTable

With your proposal, we would do:
1b) from task_id gets SpiceFileTransferTask (xfer_task) from
    channel->priv->file_xfer_tasks
2b) from xfer_task gets xfer_op

I still prefer my proposal because we wouldn't need to share
FileTransferOperation logic with SpiceFileTransferTask.

>
> Reviewed-by: Jonathon Jongsma <jjongsma at redhat.com>

Let me know what you think!
  toso


More information about the Spice-devel mailing list