[Spice-devel] [PATCH spice-gtk] controller: async flush read/write

Marc-André Lureau marcandre.lureau at gmail.com
Mon Jul 2 18:41:21 PDT 2012


Windows namedpipes behave a bit differently from Unix socket, and may
return incomplete read/write. By using 2 read/write() helpers, try to
complete the operation before returning. Since the IO operation may be
splitted over several call, we make sure the buffer pointer is on the
heap. We use exception for EOF or BROKEN_PIPE condition, which also
simplifies the code.

To really work with namedpipe, the giowin32streams need to be fixed as
well to handle concurrent read & write properly, see for details:
https://bugzilla.gnome.org/show_bug.cgi?id=679288
---
 gtk/controller/Makefile.am       |    1 +
 gtk/controller/controller.vala   |   37 +++++++----------------------
 gtk/controller/foreign-menu.vala |   48 ++++++++++++++------------------------
 gtk/controller/util.vala         |   42 +++++++++++++++++++++++++++++++++
 4 files changed, 69 insertions(+), 59 deletions(-)
 create mode 100644 gtk/controller/util.vala

diff --git a/gtk/controller/Makefile.am b/gtk/controller/Makefile.am
index 90fce40..7bfa51b 100644
--- a/gtk/controller/Makefile.am
+++ b/gtk/controller/Makefile.am
@@ -26,6 +26,7 @@ libspice_controller_la_VALASOURCES =		\
 	menu.vala				\
 	controller.vala				\
 	foreign-menu.vala			\
+	util.vala				\
 	$(NULL)
 
 libspice_controller_la_BUILT_SOURCES =			\
diff --git a/gtk/controller/controller.vala b/gtk/controller/controller.vala
index 185f5e0..24433ec 100644
--- a/gtk/controller/controller.vala
+++ b/gtk/controller/controller.vala
@@ -69,10 +69,10 @@ public class Controller: Object {
 		// message.
 		try {
 			if (excl_connection != null) {
-				yield excl_connection.output_stream.write_async (p);
+				yield output_stream_write (excl_connection.output_stream, p);
 			} else {
 				foreach (var c in clients)
-					yield c.output_stream.write_async (p);
+					yield output_stream_write (c.output_stream, p);
 			}
 		} catch (GLib.Error e) {
 			warning (e.message);
@@ -180,27 +180,19 @@ public class Controller: Object {
 	}
 
 	private async void handle_client (IOStream c) throws GLib.Error {
-		var init = SpiceProtocol.Controller.Init ();
 		var excl = false;
-		unowned uint8[] p = null;
 
 		debug ("new socket client, reading init header");
 
-		p = ((uint8[])(&init))[0:sizeof(SpiceProtocol.Controller.InitHeader)]; // FIXME vala
-		var read = yield c.input_stream.read_async (p);
-		if (warn_if (read != sizeof (SpiceProtocol.Controller.InitHeader)))
-			return;
+		var p = new uint8[sizeof(SpiceProtocol.Controller.Init)];
+		var init = (SpiceProtocol.Controller.Init*)p;
+		yield input_stream_read (c.input_stream, p);
 		if (warn_if (init.base.magic != SpiceProtocol.Controller.MAGIC))
 			return;
 		if (warn_if (init.base.version != SpiceProtocol.Controller.VERSION))
 			return;
 		if (warn_if (init.base.size < sizeof (SpiceProtocol.Controller.Init)))
 			return;
-
-		p = ((uint8[])(&init.credentials))[0:init.base.size - sizeof(SpiceProtocol.Controller.InitHeader)];
-		read = yield c.input_stream.read_async (p);
-		if (warn_if (read != (init.base.size - sizeof (SpiceProtocol.Controller.InitHeader))))
-			return;
 		if (warn_if (init.credentials != 0))
 			return;
 		if (warn_if (excl_connection != null))
@@ -217,29 +209,18 @@ public class Controller: Object {
 
 		client_connected ();
 
-		var t = new uint8[sizeof(SpiceProtocol.Controller.Msg)];
 		for (;;) {
-			read = yield c.input_stream.read_async (t[0:sizeof(SpiceProtocol.Controller.Msg)]);
-			if (read == 0)
-				break;
-
-			if (warn_if (read != sizeof (SpiceProtocol.Controller.Msg))) {
-				warning ("read only: " + read.to_string ());
-				break;
-			}
-
+			var t = new uint8[sizeof(SpiceProtocol.Controller.Msg)];
+			yield input_stream_read (c.input_stream, t);
 			var msg = (SpiceProtocol.Controller.Msg*)t;
+			debug ("new message " + msg.id.to_string () + "size " + msg.size.to_string ());
 			if (warn_if (msg.size < sizeof (SpiceProtocol.Controller.Msg)))
 				break;
 
 			if (msg.size > sizeof (SpiceProtocol.Controller.Msg)) {
 				t.resize ((int)msg.size);
 				msg = (SpiceProtocol.Controller.Msg*)t;
-				read = yield c.input_stream.read_async (t[sizeof(SpiceProtocol.Controller.Msg):msg.size]);
-				if (read == 0)
-					break;
-				if (warn_if (read != msg.size - sizeof(SpiceProtocol.Controller.Msg)))
-					break;
+				yield input_stream_read (c.input_stream, t[sizeof(SpiceProtocol.Controller.Msg):msg.size]);
 			}
 
 			handle_message (msg);
diff --git a/gtk/controller/foreign-menu.vala b/gtk/controller/foreign-menu.vala
index f2406bd..db2f353 100644
--- a/gtk/controller/foreign-menu.vala
+++ b/gtk/controller/foreign-menu.vala
@@ -70,14 +70,15 @@ public class ForeignMenu: Object {
 		send_msg (p);
 	}
 
-	public async bool send_msg (uint8[] p) throws GLib.Error {
+	public async bool send_msg (owned uint8[] p) throws GLib.Error {
 		// vala FIXME: pass Controller.Msg instead
 		// vala doesn't keep reference on the struct in async methods
 		// it copies only base, which is not enough to transmit the whole
 		// message.
 		try {
-			foreach (var c in clients)
-				yield c.output_stream.write_async (p);
+			foreach (var c in clients) {
+				yield output_stream_write (c.output_stream, p);
+			}
 		} catch (GLib.Error e) {
 			warning (e.message);
 		}
@@ -126,15 +127,11 @@ public class ForeignMenu: Object {
 	}
 
 	private async void handle_client (IOStream c) throws GLib.Error {
-		var header = SpiceProtocol.ForeignMenu.InitHeader ();
-		unowned uint8[] p = null;
-
 		debug ("new socket client, reading init header");
 
-		p = ((uint8[])(&header))[0:sizeof(SpiceProtocol.ForeignMenu.InitHeader)]; // FIXME vala
-		var read = yield c.input_stream.read_async (p);
-		if (warn_if (read != sizeof (SpiceProtocol.ForeignMenu.InitHeader)))
-			return;
+		var p = new uint8[sizeof(SpiceProtocol.ForeignMenu.InitHeader)];
+		var header = (SpiceProtocol.ForeignMenu.InitHeader*)p;
+		yield input_stream_read (c.input_stream, p);
 		if (warn_if (header.magic != SpiceProtocol.ForeignMenu.MAGIC))
 			return;
 		if (warn_if (header.version != SpiceProtocol.ForeignMenu.VERSION))
@@ -142,44 +139,33 @@ public class ForeignMenu: Object {
 		if (warn_if (header.size < sizeof (SpiceProtocol.ForeignMenu.Init)))
 			return;
 
-		uint64 credentials = 0;
-		p = ((uint8[])(&credentials))[0:sizeof(uint64)];
-		read = yield c.input_stream.read_async (p);
-		if (warn_if (read != sizeof(uint64)))
-			return;
+		var cp = new uint8[sizeof(uint64)];
+		yield input_stream_read (c.input_stream, cp);
+		uint64 credentials = *(uint64*)cp;
 		if (warn_if (credentials != 0))
 			return;
 
 		var title_size = header.size - sizeof(SpiceProtocol.ForeignMenu.Init);
 		var title = new uint8[title_size + 1];
-		read = yield c.input_stream.read_async (title[0:title_size]);
+		yield c.input_stream.read_async (title[0:title_size]);
 		this.title = (string)title;
 
 		client_connected ();
 
-		var t = new uint8[sizeof(SpiceProtocol.ForeignMenu.Msg)];
 		for (;;) {
-			read = yield c.input_stream.read_async (t[0:sizeof(SpiceProtocol.ForeignMenu.Msg)]);
-			if (read == 0)
-				break;
-
-			if (warn_if (read != sizeof (SpiceProtocol.ForeignMenu.Msg))) {
-				warning ("read only: " + read.to_string ());
-				break;
-			}
-
+			var t = new uint8[sizeof(SpiceProtocol.ForeignMenu.Msg)];
+			yield input_stream_read (c.input_stream, t);
 			var msg = (SpiceProtocol.ForeignMenu.Msg*)t;
+			debug ("new message " + msg.id.to_string () + "size " + msg.size.to_string ());
+
 			if (warn_if (msg.size < sizeof (SpiceProtocol.ForeignMenu.Msg)))
 				break;
 
 			if (msg.size > sizeof (SpiceProtocol.ForeignMenu.Msg)) {
 				t.resize ((int)msg.size);
 				msg = (SpiceProtocol.ForeignMenu.Msg*)t;
-				read = yield c.input_stream.read_async (t[sizeof(SpiceProtocol.ForeignMenu.Msg):msg.size]);
-				if (read == 0)
-					break;
-				if (warn_if (read != msg.size - sizeof(SpiceProtocol.ForeignMenu.Msg)))
-					break;
+
+				yield input_stream_read (c.input_stream, t[sizeof(SpiceProtocol.ForeignMenu.Msg):msg.size]);
 			}
 
 			handle_message (msg);
diff --git a/gtk/controller/util.vala b/gtk/controller/util.vala
new file mode 100644
index 0000000..16f546c
--- /dev/null
+++ b/gtk/controller/util.vala
@@ -0,0 +1,42 @@
+// Copyright (C) 2012 Red Hat, Inc.
+
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+// Lesser General Public License for more details.
+
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, see <http://www.gnu.org/licenses/>.
+
+namespace SpiceCtrl {
+
+	public async void input_stream_read (InputStream stream, uint8[] buffer) throws GLib.IOError {
+		var length = buffer.length;
+		ssize_t i = 0;
+
+		while (i < length) {
+			var n = yield stream.read_async (buffer[i:length]);
+			if (n == 0)
+				throw new GLib.IOError.CLOSED ("closed stream") ;
+			i += n;
+		}
+	}
+
+	public async void output_stream_write (OutputStream stream, owned uint8[] buffer) throws GLib.IOError {
+		var length = buffer.length;
+		ssize_t i = 0;
+
+		while (i < length) {
+			var n = yield stream.write_async (buffer[i:length]);
+			if (n == 0)
+				throw new GLib.IOError.CLOSED ("closed stream") ;
+			i += n;
+		}
+	}
+
+}
\ No newline at end of file
-- 
1.7.10.2



More information about the Spice-devel mailing list