<div dir="ltr">Hi, <div><br></div><div>    I am currently exploring the possibilities of using GStreamer for a project, a territory that is relatively new to me. I am convinced, however, that GStreamer is an ideal fit for my objectives.</div><div><br>The task at hand is to extract server timestamps from RTCP packets. For this, I referred to the article,<a href="https://medium.com/@Qorela/absolue-timestamp-of-arbitrary-frame-in-rtsp-stream-with-gstreamer-5a98c3057843"> Absolute Timestamp of Arbitrary Frame in RTSP Stream with GStreamer</a></div><br>  I have been running into some difficulties during the implementation phase.<br><br>I am experiencing an inconsistent triggering of the "on-receiving-rtcp" event. It appears to get activated only sporadically, approximately once in every 20 attempts. This inconsistency makes it challenging for me to proceed effectively with the project.<br><br>In addition, an error message always presents itself: "Element manager already has a pad named recv_rtcp_sink_0, the behaviour of gst_element_get_request_pad() for existing pads is undefined!". This error is making the situation even more complex to understand and resolve.<br><br>I would truly appreciate it if you could provide some insights or suggestions regarding these issues. I am eager to understand what might be causing these problems, and how best to address them.<br><br><div>  <b>here is my code test_gstreamer.py:</b><br><div style="color:rgb(212,212,212);background-color:rgb(30,30,30);font-family:"Droid Sans Mono","monospace",monospace;font-size:16px;line-height:22px;white-space:pre"><br><div><span style="color:rgb(197,134,192)">import</span> datetime</div><div><span style="color:rgb(197,134,192)">import</span> pickle</div><div><span style="color:rgb(197,134,192)">from</span> multiprocessing.connection <span style="color:rgb(197,134,192)">import</span> Connection</div><div><span style="color:rgb(197,134,192)">import</span> threading</div><br><div><span style="color:rgb(197,134,192)">from</span> time <span style="color:rgb(197,134,192)">import</span> sleep</div><div><span style="color:rgb(197,134,192)">import</span> gi</div><div><span style="color:rgb(197,134,192)">from</span> log <span style="color:rgb(197,134,192)">import</span> setup_custom_logger</div><br><div>gi.require_version(<span style="color:rgb(206,145,120)">"Gst"</span>, <span style="color:rgb(206,145,120)">"1.0"</span>)</div><div>gi.require_version(<span style="color:rgb(206,145,120)">"GstRtp"</span>, <span style="color:rgb(206,145,120)">"1.0"</span>)</div><div>gi.require_version(<span style="color:rgb(206,145,120)">"GstVideo"</span>, <span style="color:rgb(206,145,120)">"1.0"</span>)</div><br><div><span style="color:rgb(197,134,192)">from</span> gi.repository <span style="color:rgb(197,134,192)">import</span> GLib, GObject, Gst, GstRtp, GstVideo  <span style="color:rgb(106,153,85)"># noqa:F401,E402</span></div><br><div>logger = setup_custom_logger(<span style="color:rgb(156,220,254)">__name__</span>)</div><br><div>GObject.threads_init()</div><div>Gst.init()</div><br><div>url = <span style="color:rgb(206,145,120)">'rtsp://localhost:8554/mystream'</span></div><div>pipeline = <span style="color:rgb(206,145,120)">''</span></div><br><br><div><span style="color:rgb(86,156,214)">def</span> <span style="color:rgb(220,220,170)">fly</span>():</div><div>    main_loop = GLib.MainLoop()</div><div>    main_loop_thread = threading.Thread(<span style="color:rgb(156,220,254)">target</span>=main_loop.run)</div><div>    main_loop_thread.start()</div><div>    command = <span style="color:rgb(86,156,214)">f</span><span style="color:rgb(206,145,120)">"rtspsrc name=rtspsrc location=</span><span style="color:rgb(86,156,214)">{</span>url<span style="color:rgb(86,156,214)">}</span><span style="color:rgb(206,145,120)"> protocols=tcp latency=10 max-rtcp-rtp-time-diff=10 ! errorignore ! rtph265depay name=depay ! appsink name=sink"</span></div><div>    pipeline = Gst.parse_launch(command)</div><div>    bus = pipeline.get_bus()</div><div>    bus.add_signal_watch()</div><div>    bus.connect(<span style="color:rgb(206,145,120)">"message"</span>, on_message)</div><div>    bus.enable_sync_message_emission()</div><br><div>    rtspsrc = pipeline.get_by_name(<span style="color:rgb(206,145,120)">"rtspsrc"</span>)</div><div>    rtspsrc.connect(<span style="color:rgb(206,145,120)">"new-manager"</span>, on_new_manager)</div><div>    depay = pipeline.get_by_name(<span style="color:rgb(206,145,120)">"depay"</span>)</div><div>    sinkpad = depay.get_static_pad(<span style="color:rgb(206,145,120)">"sink"</span>)</div><div>    probeID = sinkpad.add_probe(</div><div>        Gst.PadProbeType.BUFFER, calculate_timestamp)</div><br><div>    pipeline.set_state(Gst.State.PLAYING)</div><br><div>    <span style="color:rgb(197,134,192)">try</span>:</div><div>        <span style="color:rgb(197,134,192)">while</span> <span style="color:rgb(86,156,214)">True</span>:</div><div>            sleep(<span style="color:rgb(181,206,168)">0.1</span>)</div><div>    <span style="color:rgb(197,134,192)">except</span> <span style="color:rgb(78,201,176)">KeyboardInterrupt</span>:</div><div>        <span style="color:rgb(197,134,192)">pass</span></div><br><div>    pipeline.set_state(Gst.State.NULL)</div><div>    main_loop.quit()</div><div>    main_loop_thread.join()</div><br><br><div><span style="color:rgb(86,156,214)">def</span> <span style="color:rgb(220,220,170)">calculate_timestamp</span>(<span style="color:rgb(156,220,254)">pad</span>, <span style="color:rgb(156,220,254)">info</span>):</div><div>    <span style="color:rgb(197,134,192)">return</span> Gst.PadProbeReturn.OK</div><br><br><div><span style="color:rgb(86,156,214)">def</span> <span style="color:rgb(220,220,170)">on_new_manager</span>(<span style="color:rgb(156,220,254)">rtspsrc</span>, <span style="color:rgb(156,220,254)">manager</span>):</div><div>    <a href="http://logger.info">logger.info</a>(<span style="color:rgb(86,156,214)">f</span><span style="color:rgb(206,145,120)">"on_new_manager"</span>)</div><div>    sinkpad = manager.get_request_pad(<span style="color:rgb(206,145,120)">"recv_rtcp_sink_0"</span>)</div><div>    session = manager.emit(<span style="color:rgb(206,145,120)">"get-internal-session"</span>, <span style="color:rgb(181,206,168)">0</span>)</div><div>    session.connect(<span style="color:rgb(206,145,120)">"on-receiving-rtcp"</span>, on_receive_rtcp)</div><br><br><div><span style="color:rgb(86,156,214)">def</span> <span style="color:rgb(220,220,170)">on_receive_rtcp</span>(<span style="color:rgb(156,220,254)">session</span>, <span style="color:rgb(156,220,254)">buffer</span>: Gst.Buffer):</div><div>    <a href="http://logger.info">logger.info</a>(<span style="color:rgb(206,145,120)">"on_receive_rtcp"</span>)</div><div>    <a href="http://logger.info">logger.info</a>(buffer)</div><div>    rtcp_buffer = GstRtp.RTCPBuffer()</div><div>    res = GstRtp.RTCPBuffer.map(buffer, Gst.MapFlags.READ, rtcp_buffer)</div><div>    rtcp_packet = GstRtp.RTCPPacket()</div><div>    rtcp_buffer.get_first_packet(rtcp_packet)</div><br><div>    <span style="color:rgb(197,134,192)">while</span> <span style="color:rgb(86,156,214)">True</span>:</div><div>        <span style="color:rgb(197,134,192)">if</span> rtcp_packet.get_type() == <a href="http://GstRtp.RTCPType.SR">GstRtp.RTCPType.SR</a>:</div><div>            <a href="http://logger.info">logger.info</a>(rtcp_packet)</div><div>            si = rtcp_packet.sr_get_sender_info()</div><div>            ntp_time = si[<span style="color:rgb(181,206,168)">1</span>]</div><div>            rtp_time = si[<span style="color:rgb(181,206,168)">2</span>]</div><div>            <a href="http://logger.info">logger.info</a>(<span style="color:rgb(206,145,120)">"ntp_time: </span><span style="color:rgb(86,156,214)">%d</span><span style="color:rgb(206,145,120)">, rtp_time: </span><span style="color:rgb(86,156,214)">%d</span><span style="color:rgb(206,145,120)">"</span>, ntp_time, rtp_time)</div><div>        <span style="color:rgb(197,134,192)">if</span> rtcp_packet.move_to_next() == <span style="color:rgb(86,156,214)">False</span>:</div><div>            <span style="color:rgb(197,134,192)">break</span></div><br><br><div><span style="color:rgb(86,156,214)">def</span> <span style="color:rgb(220,220,170)">on_message</span>(<span style="color:rgb(156,220,254)">bus</span>: Gst.Bus, <span style="color:rgb(156,220,254)">message</span>: Gst.Message):</div><div>    mtype = message.type</div><div>    <span style="color:rgb(106,153,85)"># </span><span style="color:rgb(86,156,214)">TODO</span><span style="color:rgb(106,153,85)"> handle message</span></div><br><br><div><span style="color:rgb(197,134,192)">if</span> <span style="color:rgb(156,220,254)">__name__</span> == <span style="color:rgb(206,145,120)">"__main__"</span>:</div><div>    fly()</div><br></div></div><div><br></div><div><b>here is how I run a local RSTP server:</b><br><pre class="gmail-c-mrkdwn__pre"><font size="1">$ docker run --rm -it --network=host aler9/rtsp-simple-server
$ while true; do gst-launch-1.0 rtspclientsink name=s location=<a target="_blank" class="gmail-c-link" rel="noopener noreferrer">rtsp://localhost:8554/mystream</a>
 filesrc location=//path/to/video.mp4
 ! qtdemux name=d d.video_0 ! queue ! s.sink_0 d.audio_0 ! queue ! 
s.sink_1 ; done</font></pre><pre class="gmail-c-mrkdwn__pre"><br></pre><pre class="gmail-c-mrkdwn__pre"><b>here is the output from my code, no "on-receiving-rtcp" triggered:</b></pre><pre class="gmail-c-mrkdwn__pre"><font size="1">$ python test_gstreamer.py <br>test_gstreamer.py:19: PyGIDeprecationWarning: Since version 3.11, calling threads_init is no longer needed. See: <a href="https://wiki.gnome.org/PyGObject/Threading">https://wiki.gnome.org/PyGObject/Threading</a><br>  GObject.threads_init()<br>2023-05-21 15:05:06,467 - __main__ - INFO - on_new_manager<br><br>(python:96503): GStreamer-CRITICAL **: 15:05:06.469: Element manager already has a pad named recv_rtcp_sink_0, the behaviour of  gst_element_get_request_pad() for existing pads is undefined!
</font></pre></div><div><br></div><div><b>here is output with "on-receiving-rtcp" triggered:</b><br><pre class="gmail-c-mrkdwn__pre"><font size="1">$ python test_gstreamer.py 
test_gstreamer.py:19: PyGIDeprecationWarning: Since version 3.11, calling threads_init is no longer needed. See: <a target="_blank" class="gmail-c-link" href="https://wiki.gnome.org/PyGObject/Threading" rel="noopener noreferrer">https://wiki.gnome.org/PyGObject/Threading</a>
  GObject.threads_init()
2023-05-21 15:05:10,382 - __main__ - INFO - on_new_manager

(python:96548): GStreamer-CRITICAL **: 15:05:10.384: Element manager already has a pad named recv_rtcp_sink_0, the behaviour of  gst_element_get_request_pad() for existing pads is undefined!
2023-05-21 15:05:11,730 - __main__ - INFO - on_receive_rtcp
2023-05-21 15:05:11,730 - __main__ - INFO - <Gst.Buffer object at 0x7fbf3a6918e0 (GstBuffer at 0x166cc60)>
2023-05-21 15:05:11,732 - __main__ - INFO - <GstRtp.RTCPPacket object at 0x7fbf39c49860 (void at 0x7fbf2c08b390)>
2023-05-21 15:05:11,732 - __main__ - INFO - ntp_time: 16723062952823484284, rtp_time: 4125066893</font></pre></div><div><br></div></div>