RTP to Filesink in python fails to create a playable 3gp (h264 and AMR-WB streams) file

Edip Demirbilek edip.demirbilek at gmail.com
Thu Dec 10 12:35:53 PST 2015


Hello everyone,

 

I am trying to implement reading a file (3gp container with h264 video and
AMR-WB audio streams) and stream it to the client over RTP using GStreamer
and save it at the destination.

 

With gst-launch on both server and client sides I have done that
successfully. 

 

However the same pipeline coded in python does not work. The server side
receives EOS and closes the pipeline successfully (tested it against
gst-launch on the client). But the python client code never receives the EOS
signal, and after server side streaming done, stopping the pipeline creates
a file slightly less than the one streamed on the server in terms of the
size. The file generated fails to play either with gst-launch or other
standard video players. I can display the video stream and play the audio
stream successfully when I change the pipeline on the client side coded in
python. The only problem is saving the streams to a file in client coded in
python.

 

My python server and client side codes are below. I have tested it on
virtual machines as well as dedicated separate workstations, still not
working. Performance seems to be not the issue. Any suggestion please?

 

Best Regards

Edip

 

#!/usr/bin/env python

########### RTP SERVER 

 

import gi

from gi.repository import GObject as gobject

from gi.repository import Gst as gst

gi.require_version('Gst', '1.0')

gobject.threads_init()

gst.init(None)

 

import sys

import os

import readline

 

REMOTE_HOST = '127.0.0.1'

 

mainloop = gobject.MainLoop()

pipeline = gst.Pipeline.new('server')

bus = pipeline.get_bus()

 

filesource = gst.ElementFactory.make("filesrc", "file-source")

filesource.set_property('location', "<dir>/Input.3gp")

 

dvdemux = gst.ElementFactory.make("qtdemux", "dvdemux")

 

q1 = gst.ElementFactory.make("queue", "q1")

q2 = gst.ElementFactory.make("queue", "q2")

rtph264pay = gst.ElementFactory.make("rtph264pay", "rtph264pay")

rtpamrpay = gst.ElementFactory.make("rtpamrpay", "rtpamrpay")

 

rtpbin = gst.ElementFactory.make('rtpbin', 'rtpbin')

 

#For Video

vudpsink_rtpout = gst.ElementFactory.make("udpsink", "vudpsink_rtpout")

vudpsink_rtpout.set_property('host', REMOTE_HOST)

vudpsink_rtpout.set_property('port', 5000)

 

vudpsink_rtcpout = gst.ElementFactory.make("udpsink", "vudpsink_rtcpout")

vudpsink_rtcpout.set_property('host', REMOTE_HOST)

vudpsink_rtcpout.set_property('port', 5001)

vudpsink_rtcpout.set_property('sync', False)

vudpsink_rtcpout.set_property('async', False)

 

vudpsrc_rtcpin = gst.ElementFactory.make("udpsrc", "vudpsrc_rtcpin")

vudpsrc_rtcpin.set_property('port', 5005)

 

#For Audio

audpsink_rtpout = gst.ElementFactory.make("udpsink", "audpsink_rtpout")

audpsink_rtpout.set_property('host', REMOTE_HOST)

audpsink_rtpout.set_property('port', 5002)

 

audpsink_rtcpout = gst.ElementFactory.make("udpsink", "audpsink_rtcpout")

audpsink_rtcpout.set_property('host', REMOTE_HOST)

audpsink_rtcpout.set_property('port', 5003)

audpsink_rtcpout.set_property('sync', False)

audpsink_rtcpout.set_property('async', False)

 

audpsrc_rtcpin = gst.ElementFactory.make("udpsrc", "audpsrc_rtcpin")

audpsrc_rtcpin.set_property('port', 5007)

 

# Add elements

pipeline.add(filesource)

pipeline.add(dvdemux)

pipeline.add(q1)

pipeline.add(q2)

pipeline.add(rtph264pay)

pipeline.add(rtpamrpay)

pipeline.add(rtpbin)

pipeline.add(vudpsink_rtpout)

pipeline.add(vudpsink_rtcpout)

pipeline.add(vudpsrc_rtcpin)

pipeline.add(audpsink_rtpout)

pipeline.add(audpsink_rtcpout)

pipeline.add(audpsrc_rtcpin)

 

def dvdemux_padded(demuxer, pad):

    #print "Demux_Callback entry: "+str(pad.get_name())

    if pad.get_name() == "video_0":

        print "Video Template"

        qv_pad = q1.get_static_pad("sink")

        pad.link(qv_pad)

    elif pad.get_name() == "audio_0":

        print "Audio Template"

        qa_pad = q2.get_static_pad("sink")

        pad.link(qa_pad)

    else:

        print "Template: "+str(pad.get_property("template").name_template)

 

# Create links

dvdemux.connect('pad-added', dvdemux_padded)

 

filesource.link(dvdemux)

q1.link(rtph264pay)

q2.link(rtpamrpay)

 

rtph264pay.link_pads('src', rtpbin, 'send_rtp_sink_0')

rtpbin.link_pads('send_rtp_src_0', vudpsink_rtpout, 'sink')

rtpbin.link_pads('send_rtcp_src_0', vudpsink_rtcpout, 'sink')

vudpsrc_rtcpin.link_pads('src', rtpbin, 'recv_rtcp_sink_0')

 

rtpamrpay.link_pads('src', rtpbin, 'send_rtp_sink_1')

rtpbin.link_pads('send_rtp_src_1', audpsink_rtpout, 'sink')

rtpbin.link_pads('send_rtcp_src_1', audpsink_rtcpout, 'sink')

audpsrc_rtcpin.link_pads('src', rtpbin, 'recv_rtcp_sink_1')

 

 

def eos():

    print "Sending EOS to pipeline."

    print pipeline.send_event(gst.Event.new_eos())

    print "Changing pipeline state to NULL."

    print pipeline.set_state(gst.State.NULL)

    print "Quitting the Mainloop."

    print mainloop.quit()

    

def on_message(bus, message):

    #print "On_Message entry."

    t = message.type

    if t == gst.MessageType.EOS:

        print "EOS Message."

        eos()

    elif t == gst.MessageType.ERROR:

        err, debug = message.parse_error()

        print "Error: %s" % err, debug

        pipeline.set_state(gst.State.NULL)

    #else:

    #    print "Else Message: "+str(t)

            

def on_sync_message(bus, message):

    print "On_Sync_Message entry."

    

def on_finish(bus, message):

    print "Message:Finish."

    

def on_eos(bus, msg):

    print "On_Eos."

    eos()

    

def on_error(bus, msg):

   err, debug = msg.parse_error()

   print "Error: %s" % err, debug

 

def on_warning(bus, msg):

   print "Warning."

 

 

            

print "Creating the bus."

bus.add_signal_watch()

bus.connect('message::eos', on_eos)

bus.connect('message::error', on_error)

bus.connect('message::warning', on_warning)

bus.connect("sync-message::element", on_sync_message)

bus.add_signal_watch()

 

def go():

    print ("Setting pipeline to PLAYING")

    print pipeline.set_state(gst.State.PLAYING)

    print ("Waiting pipeline to settle")

    print pipeline.get_state(1000)

    try:

        print "Running the Mainloop."

        mainloop.run()

        print pipeline.set_state(gst.State.NULL)

    except KeyboardInterrupt:

        eos()

    

go()

 

 

#!/usr/bin/env python

################ RTP CLIENT

 

import gi

from gi.repository import GObject as gobject

from gi.repository import Gst as gst

gi.require_version('Gst', '1.0')

gobject.threads_init()

gst.init(None)

 

import sys

import os

import readline

 

REMOTE_HOST = '127.0.0.1'

 

mainloop = gobject.MainLoop()

pipeline = gst.Pipeline.new('client')

bus = pipeline.get_bus()

 

rtpbin = gst.ElementFactory.make('rtpbin', 'rtpbin')

 

# For Video

vudpsrc_rtpin = gst.ElementFactory.make('udpsrc', 'vudpsrc_rtpin')

vudpsrc_rtpin.set_property('port', 5000)

vudpsrc_caps =
gst.caps_from_string("application/x-rtp,media=(string)video,clock-rate=(int)
90000,encoding-name=(string)H264")

vudpsrc_rtpin.set_property('caps', vudpsrc_caps)

 

vudpsrc_rtcpin = gst.ElementFactory.make('udpsrc', 'vudpsrc_rtcpin')

vudpsrc_rtcpin.set_property('port', 5001)

 

vudpsink_rtcpout = gst.ElementFactory.make('udpsink', 'vudpsink_rtcpout')

vudpsink_rtcpout.set_property('host', REMOTE_HOST)

vudpsink_rtcpout.set_property('port', 5005)

vudpsink_rtcpout.set_property('sync', False)

vudpsink_rtcpout.set_property('async', False)

 

#For Audio

audpsrc_rtpin = gst.ElementFactory.make('udpsrc', 'audpsrc_rtpin')

audpsrc_rtpin.set_property('port', 5002)

audpsrc_caps =
gst.caps_from_string("application/x-rtp,media=(string)audio,clock-rate=(int)
16000,encoding-name=(string)AMR-WB,encoding-params=(string)1,octet-align=(st
ring)1")

audpsrc_rtpin.set_property('caps', audpsrc_caps)

 

audpsrc_rtcpin = gst.ElementFactory.make('udpsrc', 'audpsrc_rtcpin')

audpsrc_rtcpin.set_property('port', 5003)

 

audpsink_rtcpout = gst.ElementFactory.make('udpsink', 'audpsink_rtcpout')

audpsink_rtcpout.set_property('host', REMOTE_HOST)

audpsink_rtcpout.set_property('port', 5007)

audpsink_rtcpout.set_property('sync', False)

audpsink_rtcpout.set_property('async', False)

 

rtph264depay = gst.ElementFactory.make("rtph264depay", "rtph264depay")

rtpamrdepay = gst.ElementFactory.make("rtpamrdepay", "rtpamrdepay")

 

avdec_h264 = gst.ElementFactory.make("avdec_h264", "avdec_h264")

videoconvert = gst.ElementFactory.make("videoconvert", "videoconvert")

 

avdec_amrwb = gst.ElementFactory.make("avdec_amrwb", "avdec_amrwb")

audioconvert = gst.ElementFactory.make("audioconvert", "audioconvert")

progressreport = gst.ElementFactory.make("progressreport", "progressreport")

 

#File

x264enc = gst.ElementFactory.make("x264enc", "x264enc")

voamrwbenc = gst.ElementFactory.make("voamrwbenc", "voamrwbenc")

qtmux = gst.ElementFactory.make("qtmux", "qtmux")

filesink = gst.ElementFactory.make("filesink", "filesink")

filesink.set_property("location", "<dir>/Captured.3gp")

 

pipeline.add(rtpbin)

pipeline.add(vudpsrc_rtpin)

pipeline.add(vudpsrc_rtcpin)

pipeline.add(vudpsink_rtcpout)

pipeline.add(audpsrc_rtpin)

pipeline.add(audpsrc_rtcpin)

pipeline.add(audpsink_rtcpout)

pipeline.add(rtph264depay)

pipeline.add(rtpamrdepay)

pipeline.add(avdec_h264)

pipeline.add(avdec_amrwb)

pipeline.add(videoconvert)

pipeline.add(audioconvert)

pipeline.add(progressreport)

 

#File

pipeline.add(x264enc)

pipeline.add(voamrwbenc)

pipeline.add(qtmux)

pipeline.add(filesink)

 

def pad_added_cb(rtpbin, pad, depay):

    #print "Pad Added entry. Template: "
+str(pad.get_property("template").name_template)

    #print "Pad name: " +str(pad.get_name())  

    #print "Depay name: "+str(depay.get_name())

    

    if pad.get_name().startswith("recv_rtp_src_0") and
depay.get_name().startswith("rtph264depay"):

        print "Linking video depay"

        depay_pad = rtph264depay.get_static_pad("sink")

        pad.link(depay_pad)

    elif pad.get_name().startswith("recv_rtp_src_1") and
depay.get_name().startswith("rtpamrdepay"):

        print "Linking audio depay"

        depay_pad = rtpamrdepay.get_static_pad("sink")

        pad.link(depay_pad)

    #else:

    #    print "Can not do anything."

 

print "Linking static pads."

vudpsrc_rtpin.link_pads('src', rtpbin, 'recv_rtp_sink_0') 

vudpsrc_rtcpin.link_pads('src', rtpbin, 'recv_rtcp_sink_0') 

rtpbin.link_pads('send_rtcp_src_0', vudpsink_rtcpout, 'sink') 

 

audpsrc_rtpin.link_pads('src', rtpbin, 'recv_rtp_sink_1') 

audpsrc_rtcpin.link_pads('src', rtpbin, 'recv_rtcp_sink_1') 

rtpbin.link_pads('send_rtcp_src_1', audpsink_rtcpout, 'sink')

 

rtpbin.connect('pad-added', pad_added_cb, rtph264depay)

rtpbin.connect('pad-added', pad_added_cb, rtpamrdepay)

 

rtph264depay.link(avdec_h264)

avdec_h264.link(videoconvert)

 

rtpamrdepay.link(avdec_amrwb)

avdec_amrwb.link(audioconvert)

 

#File

videoconvert.link(x264enc)

x264enc.link_pads('src', qtmux, 'video_0')

audioconvert.link(progressreport)

progressreport.link(voamrwbenc)

voamrwbenc.link_pads('src', qtmux, 'audio_0')

qtmux.link(filesink)

 

 

def on_message(bus, message):

#    print "On_Message entry."

    t = message.type

    if t == gst.MessageType.EOS:

        print "EOS Message received ###################."

        eos()

    elif t == gst.MessageType.ERROR:

        err, debug = message.parse_error()

        print "Error: %s" % err, debug

        pipeline.set_state(gst.State.NULL)

#    else:

#        print "Else Message: "+str(t)

            

def on_sync_message(bus, message):

    print "On_Sync_Message entry."

     

def on_eos(bus, msg):

    print "On_Eos."

    eos()

    

def on_error(bus, msg):

   err, debug = msg.parse_error()

   print "Error: %s" % err, debug

 

def on_warning(bus, msg):

   print "Warning."

 

def on_state(bus, msg):

   print "State Changed."

    

print "Creating the bus."

 

bus.enable_sync_message_emission()

#bus.connect("message", on_message)

bus.connect('message::eos', on_eos)

bus.connect('message::error', on_error)

bus.connect('message::warning', on_warning)

#bus.connect('message::state-changed', on_state)

bus.connect("sync-message::element", on_sync_message)

bus.add_signal_watch()

 

def start():

    pipeline.set_state(gst.State.PLAYING)

    vudpsink_rtcpout.set_locked_state(gst.State.PLAYING)

    audpsink_rtcpout.set_locked_state(gst.State.PLAYING)

    print "Started..."

 

def eos():

    print "Sending EOS to pipeline."

    print pipeline.send_event(gst.Event.new_eos())

    print "Changing pipeline state to NULL."

    print pipeline.set_state(gst.State.NULL)

    print "Quitting the Mainloop."

    print mainloop.quit()

    

    

def loop():   

    try:

        print "Running the Mainloop."

        mainloop.run()

    except KeyboardInterrupt:

        eos()

 

if __name__ == '__main__':

    start()

    loop()

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.freedesktop.org/archives/gstreamer-devel/attachments/20151210/9d04855d/attachment-0001.html>


More information about the gstreamer-devel mailing list