[pulseaudio-commits] [Git][pulseaudio/pulseaudio][master] rtp: Fix sending of small packets

Arun Raghavan gitlab at gitlab.freedesktop.org
Mon Sep 21 13:00:14 UTC 2020



Arun Raghavan pushed to branch master at PulseAudio / pulseaudio


Commits:
410db7d2 by Sanchayan Maity at 2020-09-21T10:06:48+05:30
rtp: Fix sending of small packets

The current implementation for RTP send isn't optimised for sending MTU
bytes of data like rtp-native. For eg. if MTU is 1280 bytes and we have
to send 1276 bytes, two packets are send out one of 1268 bytes and other
of 8 bytes. Sending out a packet of 8 bytes has a significant overhead
and we should be sending MTU bytes of data.

Fix this by accumulating MTU bytes of data and sending data only on
accumulation of MTU worth of data.

- - - - -


1 changed file:

- src/modules/rtp/rtp-gstreamer.c


Changes:

=====================================
src/modules/rtp/rtp-gstreamer.c
=====================================
@@ -43,6 +43,7 @@
     }
 
 #define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL)
+#define RTP_HEADER_SIZE    12
 
 struct pa_rtp_context {
     pa_fdsem *fdsem;
@@ -53,6 +54,9 @@ struct pa_rtp_context {
     GstElement *appsink;
 
     uint32_t last_timestamp;
+
+    uint8_t *send_buf;
+    size_t mtu;
 };
 
 static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss) {
@@ -171,6 +175,8 @@ pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, con
     c = pa_xnew0(pa_rtp_context, 1);
 
     c->ss = *ss;
+    c->mtu = mtu - RTP_HEADER_SIZE;
+    c->send_buf = pa_xmalloc(c->mtu);
 
     if (!gst_init_check(NULL, NULL, &error)) {
         pa_log_error("Could not initialise GStreamer: %s", error->message);
@@ -216,18 +222,10 @@ static bool process_bus_messages(pa_rtp_context *c) {
     return ret;
 }
 
-static void free_buffer(pa_memblock *memblock) {
-    pa_memblock_release(memblock);
-    pa_memblock_unref(memblock);
-}
-
 /* Called from I/O thread context */
 int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
-    pa_memchunk chunk = { 0, };
     GstBuffer *buf;
-    void *data;
-    bool stop = false;
-    int ret = 0;
+    size_t n = 0;
 
     pa_assert(c);
     pa_assert(q);
@@ -235,40 +233,81 @@ int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
     if (!process_bus_messages(c))
         return -1;
 
-    while (!stop && pa_memblockq_peek(q, &chunk) == 0) {
-        GstClock *clock;
-        GstClockTime timestamp, clock_time;
+    /*
+     * While we check here for atleast MTU worth of data being available in
+     * memblockq, we might not have exact equivalent to MTU. Hence, we walk
+     * over the memchunks in memblockq and accumulate MTU bytes next.
+     */
+    if (pa_memblockq_get_length(q) < c->mtu)
+        return 0;
+
+    for (;;) {
+        pa_memchunk chunk;
+        int r;
+
+        pa_memchunk_reset(&chunk);
+
+        if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
+            /*
+             * Accumulate MTU bytes of data before sending. If the current
+             * chunk length + accumulated bytes exceeds MTU, we drop bytes
+             * considered for transfer in this iteration from memblockq.
+             *
+             * The remaining bytes will be available in the next iteration,
+             * as these will be tracked and maintained by memblockq.
+             */
+            size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
+
+            pa_assert(chunk.memblock);
+
+            memcpy(c->send_buf + n, pa_memblock_acquire_chunk(&chunk), k);
+            pa_memblock_release(chunk.memblock);
+            pa_memblock_unref(chunk.memblock);
+
+            n += k;
+            pa_memblockq_drop(q, k);
+        }
 
-        clock = gst_element_get_clock(c->pipeline);
-        clock_time = gst_clock_get_time(clock);
-        gst_object_unref(clock);
+        if (r < 0 || n >= c->mtu) {
+            GstClock *clock;
+            GstClockTime timestamp, clock_time;
+            GstMapInfo info;
 
-        timestamp = gst_element_get_base_time(c->pipeline);
-        if (timestamp > clock_time)
-          timestamp -= clock_time;
-        else
-          timestamp = 0;
+            if (n > 0) {
+                clock = gst_element_get_clock(c->pipeline);
+                clock_time = gst_clock_get_time(clock);
+                gst_object_unref(clock);
 
-        pa_assert(chunk.memblock);
+                timestamp = gst_element_get_base_time(c->pipeline);
+                if (timestamp > clock_time)
+                  timestamp -= clock_time;
+                else
+                  timestamp = 0;
 
-        data = pa_memblock_acquire(chunk.memblock);
+                buf = gst_buffer_new_allocate(NULL, n, NULL);
+                pa_assert(buf);
 
-        buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
-                                          data, chunk.length, chunk.index, chunk.length, chunk.memblock,
-                                          (GDestroyNotify) free_buffer);
+                GST_BUFFER_PTS(buf) = timestamp;
 
-        GST_BUFFER_PTS(buf) = timestamp;
+                pa_assert_se(gst_buffer_map(buf, &info, GST_MAP_WRITE));
 
-        if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
-            pa_log_error("Could not push buffer");
-            stop = true;
-            ret = -1;
-        }
+                memcpy(info.data, c->send_buf, n);
+                gst_buffer_unmap(buf, &info);
 
-        pa_memblockq_drop(q, chunk.length);
+                if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
+                    pa_log_error("Could not push buffer");
+                    return -1;
+                }
+            }
+
+            if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
+                break;
+
+            n = 0;
+        }
     }
 
-    return ret;
+    return 0;
 }
 
 static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss) {
@@ -415,6 +454,7 @@ pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample
 
     c->fdsem = pa_fdsem_new();
     c->ss = *ss;
+    c->send_buf = NULL;
 
     if (!gst_init_check(NULL, NULL, &error)) {
         pa_log_error("Could not initialise GStreamer: %s", error->message);
@@ -537,6 +577,7 @@ void pa_rtp_context_free(pa_rtp_context *c) {
     if (c->appsrc) {
         gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc));
         gst_object_unref(c->appsrc);
+        pa_xfree(c->send_buf);
     }
 
     if (c->appsink)



View it on GitLab: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/commit/410db7d21651877dc15936f5449784e07b55fb01

-- 
View it on GitLab: https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/commit/410db7d21651877dc15936f5449784e07b55fb01
You're receiving this email because of your account on gitlab.freedesktop.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/pulseaudio-commits/attachments/20200921/2e7a0ae8/attachment-0001.htm>


More information about the pulseaudio-commits mailing list