[0.11] gst-plugins-base: tcp: Only read as much as is currently available from the socket

Sebastian Dröge slomo at kemper.freedesktop.org
Tue Jan 17 03:08:48 PST 2012


Module: gst-plugins-base
Branch: 0.11
Commit: a649fe2d61f021e729373f3c48d9eea34ec9599e
URL:    http://cgit.freedesktop.org/gstreamer/gst-plugins-base/commit/?id=a649fe2d61f021e729373f3c48d9eea34ec9599e

Author: Sebastian Dröge <sebastian.droege at collabora.co.uk>
Date:   Tue Jan 17 11:44:20 2012 +0100

tcp: Only read as much as is currently available from the socket

---

 gst/tcp/gstmultisocketsink.c |    8 +++++-
 gst/tcp/gsttcpclientsrc.c    |   55 ++++++++++++++++++++++++++++++++++++++---
 gst/tcp/gsttcpserversrc.c    |   53 ++++++++++++++++++++++++++++++++++++++--
 3 files changed, 108 insertions(+), 8 deletions(-)

diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c
index 4c6e0dd..110ef1c 100644
--- a/gst/tcp/gstmultisocketsink.c
+++ b/gst/tcp/gstmultisocketsink.c
@@ -1160,11 +1160,17 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
    * to write to us except for closing the socket, I guess it's because we
    * like to listen to our customers. */
   do {
+    gssize navail;
+
     GST_DEBUG_OBJECT (sink, "[socket %p] client wants us to read",
         client->socket);
 
+    navail = g_socket_get_available_bytes (client->socket);
+    if (navail < 0)
+      break;
+
     nread =
-        g_socket_receive (client->socket, dummy, sizeof (dummy),
+        g_socket_receive (client->socket, dummy, MIN (navail, sizeof (dummy)),
         sink->cancellable, &err);
     if (first && nread == 0) {
       /* client sent close, so remove it */
diff --git a/gst/tcp/gsttcpclientsrc.c b/gst/tcp/gsttcpclientsrc.c
index f8ed6b9..2ea6dc0 100644
--- a/gst/tcp/gsttcpclientsrc.c
+++ b/gst/tcp/gsttcpclientsrc.c
@@ -180,6 +180,7 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   gssize rret;
   GError *err = NULL;
   guint8 *data;
+  gssize avail, read;
 
   src = GST_TCP_CLIENT_SRC (psrc);
 
@@ -189,16 +190,48 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   GST_LOG_OBJECT (src, "asked for a buffer");
 
   /* read the buffer header */
-  *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
+  avail = g_socket_get_available_bytes (src->socket);
+  if (avail < 0) {
+    goto get_available_error;
+  } else if (avail == 0) {
+    GIOCondition condition;
+
+    if (!g_socket_condition_wait (src->socket,
+            G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
+      goto select_error;
+
+    condition =
+        g_socket_condition_check (src->socket,
+        G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+
+    if ((condition & G_IO_ERR)) {
+      GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+          ("Socket in error state"));
+      *outbuf = NULL;
+      ret = GST_FLOW_ERROR;
+      goto done;
+    } else if ((condition & G_IO_HUP)) {
+      GST_DEBUG_OBJECT (src, "Connection closed");
+      *outbuf = NULL;
+      ret = GST_FLOW_EOS;
+      goto done;
+    }
+    avail = g_socket_get_available_bytes (src->socket);
+    if (avail <= 0)
+      goto get_available_error;
+  }
+
+  read = MIN (avail, MAX_READ_SIZE);
+  *outbuf = gst_buffer_new_and_alloc (read);
   data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
   rret =
-      g_socket_receive (src->socket, (gchar *) data, MAX_READ_SIZE,
+      g_socket_receive (src->socket, (gchar *) data, read,
       src->cancellable, &err);
 
   if (rret == 0) {
     GST_DEBUG_OBJECT (src, "Connection closed");
     ret = GST_FLOW_EOS;
-    gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
+    gst_buffer_unmap (*outbuf, data, read);
     gst_buffer_unref (*outbuf);
     *outbuf = NULL;
   } else if (ret < 0) {
@@ -210,7 +243,7 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
           ("Failed to read from socket: %s", err->message));
     }
-    gst_buffer_unmap (*outbuf, data, MAX_READ_SIZE);
+    gst_buffer_unmap (*outbuf, data, read);
     gst_buffer_unref (*outbuf);
     *outbuf = NULL;
   } else {
@@ -228,8 +261,22 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   }
   g_clear_error (&err);
 
+done:
   return ret;
 
+select_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("Select failed: %s", err->message));
+    g_clear_error (&err);
+    return GST_FLOW_ERROR;
+  }
+get_available_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("Select to get available bytes from socket"));
+    return GST_FLOW_ERROR;
+  }
 wrong_state:
   {
     GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
diff --git a/gst/tcp/gsttcpserversrc.c b/gst/tcp/gsttcpserversrc.c
index 0cdfdd2..a217e07 100644
--- a/gst/tcp/gsttcpserversrc.c
+++ b/gst/tcp/gsttcpserversrc.c
@@ -162,7 +162,8 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstTCPServerSrc *src;
   GstFlowReturn ret = GST_FLOW_OK;
-  gssize rret;
+  gssize rret, avail;
+  gsize read;
   GError *err = NULL;
   guint8 *data;
 
@@ -184,10 +185,42 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   GST_LOG_OBJECT (src, "asked for a buffer");
 
   /* read the buffer header */
-  *outbuf = gst_buffer_new_and_alloc (MAX_READ_SIZE);
+  avail = g_socket_get_available_bytes (src->client_socket);
+  if (avail < 0) {
+    goto get_available_error;
+  } else if (avail == 0) {
+    GIOCondition condition;
+
+    if (!g_socket_condition_wait (src->client_socket,
+            G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
+      goto select_error;
+
+    condition =
+        g_socket_condition_check (src->client_socket,
+        G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+
+    if ((condition & G_IO_ERR)) {
+      GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+          ("Socket in error state"));
+      *outbuf = NULL;
+      ret = GST_FLOW_ERROR;
+      goto done;
+    } else if ((condition & G_IO_HUP)) {
+      GST_DEBUG_OBJECT (src, "Connection closed");
+      *outbuf = NULL;
+      ret = GST_FLOW_EOS;
+      goto done;
+    }
+    avail = g_socket_get_available_bytes (src->client_socket);
+    if (avail <= 0)
+      goto get_available_error;
+  }
+
+  read = MIN (avail, MAX_READ_SIZE);
+  *outbuf = gst_buffer_new_and_alloc (read);
   data = gst_buffer_map (*outbuf, NULL, NULL, GST_MAP_READWRITE);
   rret =
-      g_socket_receive (src->client_socket, (gchar *) data, MAX_READ_SIZE,
+      g_socket_receive (src->client_socket, (gchar *) data, read,
       src->cancellable, &err);
 
   if (rret == 0) {
@@ -223,6 +256,7 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   }
   g_clear_error (&err);
 
+done:
   return ret;
 
 wrong_state:
@@ -241,6 +275,19 @@ accept_error:
     g_clear_error (&err);
     return GST_FLOW_ERROR;
   }
+select_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("Select failed: %s", err->message));
+    g_clear_error (&err);
+    return GST_FLOW_ERROR;
+  }
+get_available_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("Select to get available bytes from socket"));
+    return GST_FLOW_ERROR;
+  }
 }
 
 static void



More information about the gstreamer-commits mailing list