[Spice-devel] [PATCH spice-gtk v2] webdav: don't buffer input from phodav

Frediano Ziglio fziglio at redhat.com
Wed Jul 3 10:53:18 UTC 2019


> 
> The current approach with OutputQueue in webdav has several problems:
> 
> * if the connection is slow, webdav keeps reading from phodav
> and pushing messages to the internal channel xmit_queue.
> This way, the queue can grow very quickly and the whole file
> that is being transferred using webdav essentially gets loaded into memory.
> 
> * spice channel first flushes all messages in the xmit_queue and
> then proceeds to reading. If webdav floods the xmit_queue with
> a ton of messages, spice channel does not leave iterate_write until
> the queue gets empty. This way, reading from the channel is blocked
> till the whole file is transferred.
> 
> * OutputQueue uses g_output_stream_flush_async() on SpiceVmcOutputStream
> that does not implement flush
> 
> To solve these issues, don't read from phodav until the last message
> for a given client is written out to the socket.
> (main channel currently uses the same approach when transferring files)
> 
> OutputQueue used an idle function to schedule the write and then
> called mux_pushed_cb which started reading from phodav with priority
> G_PRIORITY_DEFAULT.
> Since this new approach does not utilize the idle scheduling,
> lower the priority in client_start_read() to G_PRIORITY_DEFAULT_IDLE
> to make sure other sources with lower priority get dispatched as well
> (e.g. signals from coroutines, redrawing and resizing operations).
> 
> Also implement spice_webdav_channel_reset(). This is necessary because
> spice_vmc_write_async() references the channel. If the channel is to be
> disconnected, the write operations need to be cancelled so that
> the references to the channel are released asap. Otherwise,
> spice session would be stuck waiting for the channel to finalize.
> 
> Signed-off-by: Jakub Janků <jjanku at redhat.com>

Acked

> ---
> Changes since v1:
> * updated commit log and comment
> ---
>  src/channel-webdav.c | 43 ++++++++++++++++++++++++++++++++-----------
>  1 file changed, 32 insertions(+), 11 deletions(-)
> 
> diff --git a/src/channel-webdav.c b/src/channel-webdav.c
> index 0fa0d69..14d4e05 100644
> --- a/src/channel-webdav.c
> +++ b/src/channel-webdav.c
> @@ -45,13 +45,12 @@
>   * Since: 0.24
>   */
>  
> -typedef struct _OutputQueue OutputQueue;
> +/* typedef struct _OutputQueue OutputQueue; */
>  
>  struct _SpiceWebdavChannelPrivate {
>      SpiceVmcStream *stream;
>      GCancellable *cancellable;
>      GHashTable *clients;
> -    OutputQueue *queue;
>  
>      gboolean demuxing;
>      struct _demux {
> @@ -65,6 +64,7 @@ G_DEFINE_TYPE_WITH_PRIVATE(SpiceWebdavChannel,
> spice_webdav_channel, SPICE_TYPE_
>  
>  static void spice_webdav_handle_msg(SpiceChannel *channel, SpiceMsgIn *msg);
>  
> +#if 0
>  struct _OutputQueue {
>      GOutputStream *output;
>      gboolean flushing;
> @@ -178,6 +178,7 @@ static void output_queue_push(OutputQueue *q, const
> guint8 *buf, gsize size,
>      if (!q->idle_id && !q->flushing)
>          q->idle_id = g_idle_add(output_queue_idle, q);
>  }
> +#endif
>  
>  #define MAX_MUX_SIZE G_MAXUINT16
>  
> @@ -227,11 +228,15 @@ static void remove_client(Client *client)
>      g_hash_table_remove(client->self->priv->clients, &client->id);
>  }
>  
> -static void mux_pushed_cb(OutputQueue *q, gpointer user_data)
> +static void
> +mux_msg_flushed_cb(GObject *source_object,
> +                   GAsyncResult *result,
> +                   gpointer user_data)
>  {
>      Client *client = user_data;
>  
> -    if (client->mux.size == 0 ||
> +    if (!g_output_stream_write_all_finish(G_OUTPUT_STREAM(source_object),
> result, NULL, NULL) ||
> +        client->mux.size == 0 ||
>          !client_start_read(client)) {
>          remove_client(client);
>      }
> @@ -245,6 +250,7 @@ static void server_reply_cb(GObject *source_object,
>  {
>      Client *client = user_data;
>      SpiceWebdavChannelPrivate *c = client->self->priv;
> +    GOutputStream *mux_out;
>      GError *err = NULL;
>      gssize size;
>  
> @@ -256,8 +262,12 @@ static void server_reply_cb(GObject *source_object,
>      g_return_if_fail(size >= 0);
>      client->mux.size = GUINT16_TO_LE(size);
>  
> -    output_queue_push(c->queue, (guint8 *)&client->mux, sizeof(gint64) +
> sizeof(guint16) + size,
> -                      (GFunc)mux_pushed_cb, client);
> +    mux_out = g_io_stream_get_output_stream(G_IO_STREAM(c->stream));
> +
> +    /* this internally uses spice_vmc_write_async(), priority is ignored;
> +     * the callback is invoked once the msg is written out to the socket */
> +    g_output_stream_write_all_async(mux_out, (guint8 *)&client->mux,
> sizeof(gint64) + sizeof(guint16) + size,
> +        G_PRIORITY_DEFAULT, client->cancellable, mux_msg_flushed_cb,
> client);
>  
>      return;
>  
> @@ -280,8 +290,10 @@ static bool client_start_read(Client *client)
>      if (g_input_stream_is_closed(input)) {
>          return false;
>      }
> +    /* use G_PRIORITY_DEFAULT_IDLE to make sure
> +     * other low-priority sources get dispatched as well */
>      g_input_stream_read_async(input, client->mux.buf, MAX_MUX_SIZE,
> -                              G_PRIORITY_DEFAULT, client->cancellable,
> server_reply_cb,
> +                              G_PRIORITY_DEFAULT_IDLE, client->cancellable,
> server_reply_cb,
>                                client_ref(client));
>      return true;
>  }
> @@ -535,9 +547,6 @@ static void spice_webdav_channel_init(SpiceWebdavChannel
> *channel)
>      c->clients = g_hash_table_new_full(g_int64_hash, g_int64_equal,
>                                         NULL, client_remove_unref);
>      c->demux.buf = g_malloc0(MAX_MUX_SIZE);
> -
> -    GOutputStream *ostream =
> g_io_stream_get_output_stream(G_IO_STREAM(c->stream));
> -    c->queue = output_queue_new(ostream);
>  }
>  
>  static void spice_webdav_channel_finalize(GObject *object)
> @@ -555,7 +564,6 @@ static void spice_webdav_channel_dispose(GObject *object)
>  
>      g_cancellable_cancel(c->cancellable);
>      g_clear_object(&c->cancellable);
> -    g_clear_pointer(&c->queue, output_queue_free);
>      g_clear_object(&c->stream);
>      g_hash_table_unref(c->clients);
>  
> @@ -567,6 +575,18 @@ static void spice_webdav_channel_up(SpiceChannel
> *channel)
>      CHANNEL_DEBUG(channel, "up");
>  }
>  
> +static void spice_webdav_channel_reset(SpiceChannel *channel, gboolean
> migrating)
> +{
> +    SpiceWebdavChannelPrivate *c;
> +    c =
> spice_webdav_channel_get_instance_private(SPICE_WEBDAV_CHANNEL(channel));
> +
> +    g_cancellable_cancel(c->cancellable);
> +    c->demuxing = FALSE;
> +    g_hash_table_remove_all(c->clients);
> +
> +
> SPICE_CHANNEL_CLASS(spice_webdav_channel_parent_class)->channel_reset(channel,
> migrating);
> +}
> +
>  static void spice_webdav_channel_class_init(SpiceWebdavChannelClass *klass)
>  {
>      GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
> @@ -576,6 +596,7 @@ static void
> spice_webdav_channel_class_init(SpiceWebdavChannelClass *klass)
>      gobject_class->finalize     = spice_webdav_channel_finalize;
>      channel_class->handle_msg   = spice_webdav_handle_msg;
>      channel_class->channel_up   = spice_webdav_channel_up;
> +    channel_class->channel_reset = spice_webdav_channel_reset;
>  
>      g_signal_override_class_handler("port-event",
>                                      SPICE_TYPE_WEBDAV_CHANNEL,

Frediano


More information about the Spice-devel mailing list