[Spice-devel] [spice-gtk v5 05/23] file-xfer: introduce functions to read file async

Victor Toso lists at victortoso.com
Thu Jul 7 08:49:01 UTC 2016


Hi,

On Wed, Jul 06, 2016 at 09:34:15AM -0500, Jonathon Jongsma wrote:
> On Tue, 2016-07-05 at 15:07 +0200, Victor Toso wrote:
> > Introduced functions (private):
> > * void   spice_file_transfer_task_read_async()
> > * gssize spice_file_transfer_task_read_finish()
> > 
> > For a better abstraction of how to read from SpiceFileTransferTask and
> > handle its data, following the design of other objects like GFile and
> > GInputStream.
> > 
> > Due to the logic changes involved, some functions were created or
> > renamed to better address or match its place and purpose:
> > 
> > * spice_file_transfer_task_read_stream_cb
> >   Callback for the actual read from GInpustStream; This is handling
> >   the SpiceFileTransferTask bits only;
> > 
> > * file_xfer_read_cb -> file_xfer_read_async_cb
> >   Renamed to match _read_async() function; This is handling the data
> >   from reading the file by flushing it to the agent.
> > 
> > As the _read_async() uses GTask, the error handling is done on the
> > channel-main's callback, after _read_finish() is called.
> > 
> > This change is related to split SpiceFileTransferTask from
> > channel-main.
> > 
> > Acked-by: Jonathon Jongsma <jjongsma at redhat.com>
> > ---
> >  src/channel-main.c | 165 +++++++++++++++++++++++++++++++++++++++-------------
> > -
> >  1 file changed, 122 insertions(+), 43 deletions(-)
> > 
> > diff --git a/src/channel-main.c b/src/channel-main.c
> > index d721852..244b19e 100644
> > --- a/src/channel-main.c
> > +++ b/src/channel-main.c
> > @@ -71,6 +71,13 @@ static void
> > spice_file_transfer_task_init_task_async(SpiceFileTransferTask *self
> >  static GFileInfo
> > *spice_file_transfer_task_init_task_finish(SpiceFileTransferTask *xfer_task,
> >                                                              GAsyncResult
> > *result,
> >                                                              GError **error);
> > +static void spice_file_transfer_task_read_async(SpiceFileTransferTask *self,
> > +                                                GAsyncReadyCallback callback,
> > +                                                gpointer userdata);
> > +static gssize spice_file_transfer_task_read_finish(SpiceFileTransferTask
> > *self,
> > +                                                   GAsyncResult *result,
> > +                                                   char **buffer,
> > +                                                   GError **error);
> >  
> >  /**
> >   * SECTION:file-transfer-task
> > @@ -246,9 +253,11 @@ static void migrate_channel_event_cb(SpiceChannel
> > *channel, SpiceChannelEvent ev
> >                                       gpointer data);
> >  static gboolean main_migrate_handshake_done(gpointer data);
> >  static void spice_main_channel_send_migration_handshake(SpiceChannel
> > *channel);
> > -static void file_xfer_continue_read(SpiceFileTransferTask *task);
> >  static void spice_file_transfer_task_completed(SpiceFileTransferTask *self,
> > GError *error);
> >  static void file_xfer_flushed(SpiceMainChannel *channel, gboolean success);
> > +static void file_xfer_read_async_cb(GObject *source_object,
> > +                                    GAsyncResult *res,
> > +                                    gpointer user_data);
> >  static void spice_main_set_max_clipboard(SpiceMainChannel *self, gint max);
> >  static void set_agent_connected(SpiceMainChannel *channel, gboolean
> > connected);
> >  
> > @@ -1882,7 +1891,6 @@ static void file_xfer_data_flushed_cb(GObject
> > *source_object,
> >      SpiceMainChannel *channel = (SpiceMainChannel *)source_object;
> >      GError *error = NULL;
> >  
> > -    self->pending = FALSE;
> >      file_xfer_flush_finish(channel, res, &error);
> >      if (error || self->error) {
> >          spice_file_transfer_task_completed(self, error);
> > @@ -1923,7 +1931,7 @@ static void file_xfer_data_flushed_cb(GObject
> > *source_object,
> >      }
> >  
> >      /* Read more data */
> > -    file_xfer_continue_read(self);
> > +    spice_file_transfer_task_read_async(self, file_xfer_read_async_cb, NULL);
> >  }
> >  
> >  static void file_xfer_queue(SpiceFileTransferTask *self, int data_size)
> > @@ -1943,54 +1951,35 @@ static void file_xfer_queue(SpiceFileTransferTask
> > *self, int data_size)
> >  }
> >  
> >  /* main context */
> > -static void file_xfer_read_cb(GObject *source_object,
> > -                              GAsyncResult *res,
> > -                              gpointer user_data)
> > +static void file_xfer_read_async_cb(GObject *source_object,
> > +                                    GAsyncResult *res,
> > +                                    gpointer user_data)
> >  {
> > -    SpiceFileTransferTask *self = user_data;
> > -    SpiceMainChannel *channel = self->channel;
> > +    SpiceFileTransferTask *xfer_task;
> > +    SpiceMainChannel *channel;
> >      gssize count;
> > +    char *buffer;
> > +    GCancellable *cancellable;
> >      GError *error = NULL;
> >  
> > -    self->pending = FALSE;
> > -    count = g_input_stream_read_finish(G_INPUT_STREAM(self->file_stream),
> > -                                       res, &error);
> > -    /* Check for pending earlier errors */
> > -    if (self->error) {
> > -        spice_file_transfer_task_completed(self, error);
> > +    xfer_task = SPICE_FILE_TRANSFER_TASK(source_object);
> > +
> > +    channel = spice_file_transfer_task_get_channel(xfer_task);
> > +    count = spice_file_transfer_task_read_finish(xfer_task, res, &buffer,
> > &error);
> > +    if (count < 0) {
> > +        spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE);
> > +        spice_file_transfer_task_completed(xfer_task, error);
> >          return;
> >      }
> >  
> > -    if (count > 0 || self->file_size == 0) {
> > -        GCancellable *cancellable;
> > -
> > -        self->read_bytes += count;
> > -        g_object_notify(G_OBJECT(self), "progress");
> > -        file_xfer_queue(self, count);
> > -        if (count == 0)
> > -            return;
> > -        cancellable = spice_file_transfer_task_get_cancellable(self);
> > -        file_xfer_flush_async(channel, cancellable,
> > -                              file_xfer_data_flushed_cb, self);
> > -        self->pending = TRUE;
> > -    } else if (error) {
> > -        spice_channel_wakeup(SPICE_CHANNEL(self->channel), FALSE);
> > -        spice_file_transfer_task_completed(self, error);
> > +    file_xfer_queue(xfer_task, count);
> > +    if (count == 0) {
> > +        /* on EOF just wait for VD_AGENT_FILE_XFER_STATUS from agent */
> > +        return;
> >      }
> > -    /* else EOF, do nothing (wait for VD_AGENT_FILE_XFER_STATUS from agent)
> > */
> > -}
> >  
> > -/* coroutine context */
> > -static void file_xfer_continue_read(SpiceFileTransferTask *self)
> > -{
> > -    g_input_stream_read_async(G_INPUT_STREAM(self->file_stream),
> > -                              self->buffer,
> > -                              FILE_XFER_CHUNK_SIZE,
> > -                              G_PRIORITY_DEFAULT,
> > -                              self->cancellable,
> > -                              file_xfer_read_cb,
> > -                              self);
> > -    self->pending = TRUE;
> > +    cancellable = spice_file_transfer_task_get_cancellable(xfer_task);
> > +    file_xfer_flush_async(channel, cancellable, file_xfer_data_flushed_cb,
> > xfer_task);
> >  }
> >  
> >  /* coroutine context */
> > @@ -2009,7 +1998,7 @@ static void
> > spice_file_transfer_task_handle_status(SpiceFileTransferTask *task,
> >                             "transfer received CAN_SEND_DATA in pending
> > state");
> >              break;
> >          }
> > -        file_xfer_continue_read(task);
> > +        spice_file_transfer_task_read_async(task, file_xfer_read_async_cb,
> > NULL);
> >          return;
> >      case VD_AGENT_FILE_XFER_STATUS_CANCELLED:
> >          error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED,
> > @@ -3370,6 +3359,96 @@ static GFileInfo
> > *spice_file_transfer_task_init_task_finish(SpiceFileTransferTas
> >      return g_task_propagate_pointer(task, error);
> >  }
> >  
> > +static void spice_file_transfer_task_read_stream_cb(GObject *source_object,
> > +                                                    GAsyncResult *res,
> > +                                                    gpointer userdata)
> > +{
> > +    SpiceFileTransferTask *self;
> > +    GTask *task;
> > +    gssize nbytes;
> > +    GError *error = NULL;
> > +
> > +    task = G_TASK(userdata);
> > +    self = g_task_get_source_object(task);
> > +
> > +    g_return_if_fail(self->pending == TRUE);
> > +    self->pending = FALSE;
> > +
> > +    nbytes = g_input_stream_read_finish(G_INPUT_STREAM(self->file_stream),
> > res, &error);
> > +    if (self->error) {
> > +        /* On any pending error on SpiceFileTransferTask */
> > +        g_task_return_error(task, self->error);
> > +        return;
> > +    } else if (error) {
> > +        g_task_return_error(task, error);
> > +        return;
> > +    }
> > +
> > +    self->read_bytes += nbytes;
> > +
> > +    g_task_return_int(task, nbytes);
> > +}
> > +
> > +/* Any context */
> > +static void spice_file_transfer_task_read_async(SpiceFileTransferTask *self,
> > +                                                GAsyncReadyCallback callback,
> > +                                                gpointer userdata)
> > +{
> > +    GTask *task;
> > +
> > +    g_return_if_fail(self != NULL);
> > +    if (self->pending) {
> > +        g_task_report_new_error(self, callback, userdata,
> > +                                spice_file_transfer_task_read_async,
> > +                                SPICE_CLIENT_ERROR,
> > +                                SPICE_CLIENT_ERROR_FAILED,
> > +                                "Cannot read data in pending state");
> > +        return;
> > +    }
> > +
> > +    /* Notify the progress prior the read to make the info be related to the
> > +     * data that was already sent. To notify the 100% (completed), channel-
> > main
> > +     * should call read-async when it expects EOF. */
> > +    g_object_notify(G_OBJECT(self), "progress");
> > +
> > +    task = g_task_new(self, self->cancellable, callback, userdata);
> > +
> > +    if (self->read_bytes == self->file_size) {
> > +        /* channel-main might can request data after reading the whole file
> > as
> > +         * it expects EOF. Let's return immediately its request as we don't
> > want
> > +         * to reach a state where agent says file-transfer SUCCEED but we are
> > in
> > +         * a PENDING state in SpiceFileTransferTask due reading in idle */
> > +        g_task_return_int(task, 0);
> > +        return;
> > +    }
>
> As an alternative, could we also simply emit a "progress" notification in
> _task_completed() or something?

The check for EOF is not related to the progress notification but a race
that might happen, which is:

1-) client reads all data from file with _task_read_async()
2-) client flush data to agent and request to read more;
3-) the stream_read_async() will return EOF but async
4-) the agent might send SUCCEED while stream_read_async() did not
    return yet, meaning that file-transfer is pending and triggers an
    error (can't succeed on pending)

This one took a while for me to understand while doing a file-transfer
with 40+ files at the same time as test.

> If we keep this approach, there's a little comment issue to fix:
> "might can" -> "might"

Thanks! I'll fix it :)


>
> > +
> > +    self->pending = TRUE;
> > +    g_input_stream_read_async(G_INPUT_STREAM(self->file_stream),
> > +                              self->buffer,
> > +                              FILE_XFER_CHUNK_SIZE,
> > +                              G_PRIORITY_DEFAULT,
> > +                              self->cancellable,
> > +                              spice_file_transfer_task_read_stream_cb,
> > +                              task);
> > +}
> > +
> > +static gssize spice_file_transfer_task_read_finish(SpiceFileTransferTask
> > *self,
> > +                                                   GAsyncResult *result,
> > +                                                   char **buffer,
> > +                                                   GError **error)
> > +{
> > +    gssize nbytes;
> > +    GTask *task = G_TASK(result);
> > +
> > +    g_return_val_if_fail(self != NULL, -1);
> > +
> > +    nbytes = g_task_propagate_int(task, error);
> > +    if (nbytes >= 0 && buffer != NULL)
> > +        *buffer = self->buffer;
> > +
> > +    return nbytes;
> > +}
> > +
> >  static void
> >  spice_file_transfer_task_get_property(GObject *object,
> >                                        guint property_id,
> 
> 
> Acked-by: Jonathon Jongsma <jjongsma at redhat.com>

Cheers,
  toso
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel


More information about the Spice-devel mailing list