[Spice-devel] [PATCH spice-gtk] controller: async flush read/write
Alon Levy
alevy at redhat.com
Tue Jul 3 01:39:49 PDT 2012
On Tue, Jul 03, 2012 at 03:41:21AM +0200, Marc-André Lureau wrote:
> Windows namedpipes behave a bit differently from Unix socket, and may
> return incomplete read/write. By using 2 read/write() helpers, try to
> complete the operation before returning. Since the IO operation may be
> splitted over several call, we make sure the buffer pointer is on the
> heap. We use exception for EOF or BROKEN_PIPE condition, which also
> simplifies the code.
Looks good to me.
Can't see who handles the thrown CLOSED exception but I guess it's
somewhere.
>
> To really work with namedpipe, the giowin32streams need to be fixed as
> well to handle concurrent read & write properly, see for details:
> https://bugzilla.gnome.org/show_bug.cgi?id=679288
> ---
> gtk/controller/Makefile.am | 1 +
> gtk/controller/controller.vala | 37 +++++++----------------------
> gtk/controller/foreign-menu.vala | 48 ++++++++++++++------------------------
> gtk/controller/util.vala | 42 +++++++++++++++++++++++++++++++++
> 4 files changed, 69 insertions(+), 59 deletions(-)
> create mode 100644 gtk/controller/util.vala
>
> diff --git a/gtk/controller/Makefile.am b/gtk/controller/Makefile.am
> index 90fce40..7bfa51b 100644
> --- a/gtk/controller/Makefile.am
> +++ b/gtk/controller/Makefile.am
> @@ -26,6 +26,7 @@ libspice_controller_la_VALASOURCES = \
> menu.vala \
> controller.vala \
> foreign-menu.vala \
> + util.vala \
> $(NULL)
>
> libspice_controller_la_BUILT_SOURCES = \
> diff --git a/gtk/controller/controller.vala b/gtk/controller/controller.vala
> index 185f5e0..24433ec 100644
> --- a/gtk/controller/controller.vala
> +++ b/gtk/controller/controller.vala
> @@ -69,10 +69,10 @@ public class Controller: Object {
> // message.
> try {
> if (excl_connection != null) {
> - yield excl_connection.output_stream.write_async (p);
> + yield output_stream_write (excl_connection.output_stream, p);
> } else {
> foreach (var c in clients)
> - yield c.output_stream.write_async (p);
> + yield output_stream_write (c.output_stream, p);
> }
> } catch (GLib.Error e) {
> warning (e.message);
> @@ -180,27 +180,19 @@ public class Controller: Object {
> }
>
> private async void handle_client (IOStream c) throws GLib.Error {
> - var init = SpiceProtocol.Controller.Init ();
> var excl = false;
> - unowned uint8[] p = null;
>
> debug ("new socket client, reading init header");
>
> - p = ((uint8[])(&init))[0:sizeof(SpiceProtocol.Controller.InitHeader)]; // FIXME vala
> - var read = yield c.input_stream.read_async (p);
> - if (warn_if (read != sizeof (SpiceProtocol.Controller.InitHeader)))
> - return;
> + var p = new uint8[sizeof(SpiceProtocol.Controller.Init)];
> + var init = (SpiceProtocol.Controller.Init*)p;
> + yield input_stream_read (c.input_stream, p);
> if (warn_if (init.base.magic != SpiceProtocol.Controller.MAGIC))
> return;
> if (warn_if (init.base.version != SpiceProtocol.Controller.VERSION))
> return;
> if (warn_if (init.base.size < sizeof (SpiceProtocol.Controller.Init)))
> return;
> -
> - p = ((uint8[])(&init.credentials))[0:init.base.size - sizeof(SpiceProtocol.Controller.InitHeader)];
> - read = yield c.input_stream.read_async (p);
> - if (warn_if (read != (init.base.size - sizeof (SpiceProtocol.Controller.InitHeader))))
> - return;
> if (warn_if (init.credentials != 0))
> return;
> if (warn_if (excl_connection != null))
> @@ -217,29 +209,18 @@ public class Controller: Object {
>
> client_connected ();
>
> - var t = new uint8[sizeof(SpiceProtocol.Controller.Msg)];
> for (;;) {
> - read = yield c.input_stream.read_async (t[0:sizeof(SpiceProtocol.Controller.Msg)]);
> - if (read == 0)
> - break;
> -
> - if (warn_if (read != sizeof (SpiceProtocol.Controller.Msg))) {
> - warning ("read only: " + read.to_string ());
> - break;
> - }
> -
> + var t = new uint8[sizeof(SpiceProtocol.Controller.Msg)];
> + yield input_stream_read (c.input_stream, t);
> var msg = (SpiceProtocol.Controller.Msg*)t;
> + debug ("new message " + msg.id.to_string () + "size " + msg.size.to_string ());
> if (warn_if (msg.size < sizeof (SpiceProtocol.Controller.Msg)))
> break;
>
> if (msg.size > sizeof (SpiceProtocol.Controller.Msg)) {
> t.resize ((int)msg.size);
> msg = (SpiceProtocol.Controller.Msg*)t;
> - read = yield c.input_stream.read_async (t[sizeof(SpiceProtocol.Controller.Msg):msg.size]);
> - if (read == 0)
> - break;
> - if (warn_if (read != msg.size - sizeof(SpiceProtocol.Controller.Msg)))
> - break;
> + yield input_stream_read (c.input_stream, t[sizeof(SpiceProtocol.Controller.Msg):msg.size]);
> }
>
> handle_message (msg);
> diff --git a/gtk/controller/foreign-menu.vala b/gtk/controller/foreign-menu.vala
> index f2406bd..db2f353 100644
> --- a/gtk/controller/foreign-menu.vala
> +++ b/gtk/controller/foreign-menu.vala
> @@ -70,14 +70,15 @@ public class ForeignMenu: Object {
> send_msg (p);
> }
>
> - public async bool send_msg (uint8[] p) throws GLib.Error {
> + public async bool send_msg (owned uint8[] p) throws GLib.Error {
> // vala FIXME: pass Controller.Msg instead
> // vala doesn't keep reference on the struct in async methods
> // it copies only base, which is not enough to transmit the whole
> // message.
> try {
> - foreach (var c in clients)
> - yield c.output_stream.write_async (p);
> + foreach (var c in clients) {
> + yield output_stream_write (c.output_stream, p);
> + }
> } catch (GLib.Error e) {
> warning (e.message);
> }
> @@ -126,15 +127,11 @@ public class ForeignMenu: Object {
> }
>
> private async void handle_client (IOStream c) throws GLib.Error {
> - var header = SpiceProtocol.ForeignMenu.InitHeader ();
> - unowned uint8[] p = null;
> -
> debug ("new socket client, reading init header");
>
> - p = ((uint8[])(&header))[0:sizeof(SpiceProtocol.ForeignMenu.InitHeader)]; // FIXME vala
> - var read = yield c.input_stream.read_async (p);
> - if (warn_if (read != sizeof (SpiceProtocol.ForeignMenu.InitHeader)))
> - return;
> + var p = new uint8[sizeof(SpiceProtocol.ForeignMenu.InitHeader)];
> + var header = (SpiceProtocol.ForeignMenu.InitHeader*)p;
> + yield input_stream_read (c.input_stream, p);
> if (warn_if (header.magic != SpiceProtocol.ForeignMenu.MAGIC))
> return;
> if (warn_if (header.version != SpiceProtocol.ForeignMenu.VERSION))
> @@ -142,44 +139,33 @@ public class ForeignMenu: Object {
> if (warn_if (header.size < sizeof (SpiceProtocol.ForeignMenu.Init)))
> return;
>
> - uint64 credentials = 0;
> - p = ((uint8[])(&credentials))[0:sizeof(uint64)];
> - read = yield c.input_stream.read_async (p);
> - if (warn_if (read != sizeof(uint64)))
> - return;
> + var cp = new uint8[sizeof(uint64)];
> + yield input_stream_read (c.input_stream, cp);
> + uint64 credentials = *(uint64*)cp;
> if (warn_if (credentials != 0))
> return;
>
> var title_size = header.size - sizeof(SpiceProtocol.ForeignMenu.Init);
> var title = new uint8[title_size + 1];
> - read = yield c.input_stream.read_async (title[0:title_size]);
> + yield c.input_stream.read_async (title[0:title_size]);
> this.title = (string)title;
>
> client_connected ();
>
> - var t = new uint8[sizeof(SpiceProtocol.ForeignMenu.Msg)];
> for (;;) {
> - read = yield c.input_stream.read_async (t[0:sizeof(SpiceProtocol.ForeignMenu.Msg)]);
> - if (read == 0)
> - break;
> -
> - if (warn_if (read != sizeof (SpiceProtocol.ForeignMenu.Msg))) {
> - warning ("read only: " + read.to_string ());
> - break;
> - }
> -
> + var t = new uint8[sizeof(SpiceProtocol.ForeignMenu.Msg)];
> + yield input_stream_read (c.input_stream, t);
> var msg = (SpiceProtocol.ForeignMenu.Msg*)t;
> + debug ("new message " + msg.id.to_string () + "size " + msg.size.to_string ());
> +
> if (warn_if (msg.size < sizeof (SpiceProtocol.ForeignMenu.Msg)))
> break;
>
> if (msg.size > sizeof (SpiceProtocol.ForeignMenu.Msg)) {
> t.resize ((int)msg.size);
> msg = (SpiceProtocol.ForeignMenu.Msg*)t;
> - read = yield c.input_stream.read_async (t[sizeof(SpiceProtocol.ForeignMenu.Msg):msg.size]);
> - if (read == 0)
> - break;
> - if (warn_if (read != msg.size - sizeof(SpiceProtocol.ForeignMenu.Msg)))
> - break;
> +
> + yield input_stream_read (c.input_stream, t[sizeof(SpiceProtocol.ForeignMenu.Msg):msg.size]);
> }
>
> handle_message (msg);
> diff --git a/gtk/controller/util.vala b/gtk/controller/util.vala
> new file mode 100644
> index 0000000..16f546c
> --- /dev/null
> +++ b/gtk/controller/util.vala
> @@ -0,0 +1,42 @@
> +// Copyright (C) 2012 Red Hat, Inc.
> +
> +// This library is free software; you can redistribute it and/or
> +// modify it under the terms of the GNU Lesser General Public
> +// License as published by the Free Software Foundation; either
> +// version 2.1 of the License, or (at your option) any later version.
> +
> +// This library is distributed in the hope that it will be useful,
> +// but WITHOUT ANY WARRANTY; without even the implied warranty of
> +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +// Lesser General Public License for more details.
> +
> +// You should have received a copy of the GNU Lesser General Public
> +// License along with this library; if not, see <http://www.gnu.org/licenses/>.
> +
> +namespace SpiceCtrl {
> +
> + public async void input_stream_read (InputStream stream, uint8[] buffer) throws GLib.IOError {
> + var length = buffer.length;
> + ssize_t i = 0;
> +
> + while (i < length) {
> + var n = yield stream.read_async (buffer[i:length]);
> + if (n == 0)
> + throw new GLib.IOError.CLOSED ("closed stream") ;
> + i += n;
> + }
> + }
> +
> + public async void output_stream_write (OutputStream stream, owned uint8[] buffer) throws GLib.IOError {
> + var length = buffer.length;
> + ssize_t i = 0;
> +
> + while (i < length) {
> + var n = yield stream.write_async (buffer[i:length]);
> + if (n == 0)
> + throw new GLib.IOError.CLOSED ("closed stream") ;
> + i += n;
> + }
> + }
> +
> +}
> \ No newline at end of file
> --
> 1.7.10.2
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
More information about the Spice-devel
mailing list