RTP to Filesink in python fails to create a playable 3gp (h264 and AMR-WB streams) file
Edip Demirbilek
edip.demirbilek at gmail.com
Thu Dec 10 12:35:53 PST 2015
Hello everyone,
I am trying to implement reading a file (3gp container with h264 video and
AMR-WB audio streams) and stream it to the client over RTP using GStreamer
and save it at the destination.
With gst-launch on both server and client sides I have done that
successfully.
However the same pipeline coded in python does not work. The server side
receives EOS and closes the pipeline successfully (tested it against
gst-launch on the client). But the python client code never receives the EOS
signal, and after server side streaming done, stopping the pipeline creates
a file slightly less than the one streamed on the server in terms of the
size. The file generated fails to play either with gst-launch or other
standard video players. I can display the video stream and play the audio
stream successfully when I change the pipeline on the client side coded in
python. The only problem is saving the streams to a file in client coded in
python.
My python server and client side codes are below. I have tested it on
virtual machines as well as dedicated separate workstations, still not
working. Performance seems to be not the issue. Any suggestion please?
Best Regards
Edip
#!/usr/bin/env python
########### RTP SERVER
import gi
from gi.repository import GObject as gobject
from gi.repository import Gst as gst
gi.require_version('Gst', '1.0')
gobject.threads_init()
gst.init(None)
import sys
import os
import readline
REMOTE_HOST = '127.0.0.1'
mainloop = gobject.MainLoop()
pipeline = gst.Pipeline.new('server')
bus = pipeline.get_bus()
filesource = gst.ElementFactory.make("filesrc", "file-source")
filesource.set_property('location', "<dir>/Input.3gp")
dvdemux = gst.ElementFactory.make("qtdemux", "dvdemux")
q1 = gst.ElementFactory.make("queue", "q1")
q2 = gst.ElementFactory.make("queue", "q2")
rtph264pay = gst.ElementFactory.make("rtph264pay", "rtph264pay")
rtpamrpay = gst.ElementFactory.make("rtpamrpay", "rtpamrpay")
rtpbin = gst.ElementFactory.make('rtpbin', 'rtpbin')
#For Video
vudpsink_rtpout = gst.ElementFactory.make("udpsink", "vudpsink_rtpout")
vudpsink_rtpout.set_property('host', REMOTE_HOST)
vudpsink_rtpout.set_property('port', 5000)
vudpsink_rtcpout = gst.ElementFactory.make("udpsink", "vudpsink_rtcpout")
vudpsink_rtcpout.set_property('host', REMOTE_HOST)
vudpsink_rtcpout.set_property('port', 5001)
vudpsink_rtcpout.set_property('sync', False)
vudpsink_rtcpout.set_property('async', False)
vudpsrc_rtcpin = gst.ElementFactory.make("udpsrc", "vudpsrc_rtcpin")
vudpsrc_rtcpin.set_property('port', 5005)
#For Audio
audpsink_rtpout = gst.ElementFactory.make("udpsink", "audpsink_rtpout")
audpsink_rtpout.set_property('host', REMOTE_HOST)
audpsink_rtpout.set_property('port', 5002)
audpsink_rtcpout = gst.ElementFactory.make("udpsink", "audpsink_rtcpout")
audpsink_rtcpout.set_property('host', REMOTE_HOST)
audpsink_rtcpout.set_property('port', 5003)
audpsink_rtcpout.set_property('sync', False)
audpsink_rtcpout.set_property('async', False)
audpsrc_rtcpin = gst.ElementFactory.make("udpsrc", "audpsrc_rtcpin")
audpsrc_rtcpin.set_property('port', 5007)
# Add elements
pipeline.add(filesource)
pipeline.add(dvdemux)
pipeline.add(q1)
pipeline.add(q2)
pipeline.add(rtph264pay)
pipeline.add(rtpamrpay)
pipeline.add(rtpbin)
pipeline.add(vudpsink_rtpout)
pipeline.add(vudpsink_rtcpout)
pipeline.add(vudpsrc_rtcpin)
pipeline.add(audpsink_rtpout)
pipeline.add(audpsink_rtcpout)
pipeline.add(audpsrc_rtcpin)
def dvdemux_padded(demuxer, pad):
#print "Demux_Callback entry: "+str(pad.get_name())
if pad.get_name() == "video_0":
print "Video Template"
qv_pad = q1.get_static_pad("sink")
pad.link(qv_pad)
elif pad.get_name() == "audio_0":
print "Audio Template"
qa_pad = q2.get_static_pad("sink")
pad.link(qa_pad)
else:
print "Template: "+str(pad.get_property("template").name_template)
# Create links
dvdemux.connect('pad-added', dvdemux_padded)
filesource.link(dvdemux)
q1.link(rtph264pay)
q2.link(rtpamrpay)
rtph264pay.link_pads('src', rtpbin, 'send_rtp_sink_0')
rtpbin.link_pads('send_rtp_src_0', vudpsink_rtpout, 'sink')
rtpbin.link_pads('send_rtcp_src_0', vudpsink_rtcpout, 'sink')
vudpsrc_rtcpin.link_pads('src', rtpbin, 'recv_rtcp_sink_0')
rtpamrpay.link_pads('src', rtpbin, 'send_rtp_sink_1')
rtpbin.link_pads('send_rtp_src_1', audpsink_rtpout, 'sink')
rtpbin.link_pads('send_rtcp_src_1', audpsink_rtcpout, 'sink')
audpsrc_rtcpin.link_pads('src', rtpbin, 'recv_rtcp_sink_1')
def eos():
print "Sending EOS to pipeline."
print pipeline.send_event(gst.Event.new_eos())
print "Changing pipeline state to NULL."
print pipeline.set_state(gst.State.NULL)
print "Quitting the Mainloop."
print mainloop.quit()
def on_message(bus, message):
#print "On_Message entry."
t = message.type
if t == gst.MessageType.EOS:
print "EOS Message."
eos()
elif t == gst.MessageType.ERROR:
err, debug = message.parse_error()
print "Error: %s" % err, debug
pipeline.set_state(gst.State.NULL)
#else:
# print "Else Message: "+str(t)
def on_sync_message(bus, message):
print "On_Sync_Message entry."
def on_finish(bus, message):
print "Message:Finish."
def on_eos(bus, msg):
print "On_Eos."
eos()
def on_error(bus, msg):
err, debug = msg.parse_error()
print "Error: %s" % err, debug
def on_warning(bus, msg):
print "Warning."
print "Creating the bus."
bus.add_signal_watch()
bus.connect('message::eos', on_eos)
bus.connect('message::error', on_error)
bus.connect('message::warning', on_warning)
bus.connect("sync-message::element", on_sync_message)
bus.add_signal_watch()
def go():
print ("Setting pipeline to PLAYING")
print pipeline.set_state(gst.State.PLAYING)
print ("Waiting pipeline to settle")
print pipeline.get_state(1000)
try:
print "Running the Mainloop."
mainloop.run()
print pipeline.set_state(gst.State.NULL)
except KeyboardInterrupt:
eos()
go()
#!/usr/bin/env python
################ RTP CLIENT
import gi
from gi.repository import GObject as gobject
from gi.repository import Gst as gst
gi.require_version('Gst', '1.0')
gobject.threads_init()
gst.init(None)
import sys
import os
import readline
REMOTE_HOST = '127.0.0.1'
mainloop = gobject.MainLoop()
pipeline = gst.Pipeline.new('client')
bus = pipeline.get_bus()
rtpbin = gst.ElementFactory.make('rtpbin', 'rtpbin')
# For Video
vudpsrc_rtpin = gst.ElementFactory.make('udpsrc', 'vudpsrc_rtpin')
vudpsrc_rtpin.set_property('port', 5000)
vudpsrc_caps =
gst.caps_from_string("application/x-rtp,media=(string)video,clock-rate=(int)
90000,encoding-name=(string)H264")
vudpsrc_rtpin.set_property('caps', vudpsrc_caps)
vudpsrc_rtcpin = gst.ElementFactory.make('udpsrc', 'vudpsrc_rtcpin')
vudpsrc_rtcpin.set_property('port', 5001)
vudpsink_rtcpout = gst.ElementFactory.make('udpsink', 'vudpsink_rtcpout')
vudpsink_rtcpout.set_property('host', REMOTE_HOST)
vudpsink_rtcpout.set_property('port', 5005)
vudpsink_rtcpout.set_property('sync', False)
vudpsink_rtcpout.set_property('async', False)
#For Audio
audpsrc_rtpin = gst.ElementFactory.make('udpsrc', 'audpsrc_rtpin')
audpsrc_rtpin.set_property('port', 5002)
audpsrc_caps =
gst.caps_from_string("application/x-rtp,media=(string)audio,clock-rate=(int)
16000,encoding-name=(string)AMR-WB,encoding-params=(string)1,octet-align=(st
ring)1")
audpsrc_rtpin.set_property('caps', audpsrc_caps)
audpsrc_rtcpin = gst.ElementFactory.make('udpsrc', 'audpsrc_rtcpin')
audpsrc_rtcpin.set_property('port', 5003)
audpsink_rtcpout = gst.ElementFactory.make('udpsink', 'audpsink_rtcpout')
audpsink_rtcpout.set_property('host', REMOTE_HOST)
audpsink_rtcpout.set_property('port', 5007)
audpsink_rtcpout.set_property('sync', False)
audpsink_rtcpout.set_property('async', False)
rtph264depay = gst.ElementFactory.make("rtph264depay", "rtph264depay")
rtpamrdepay = gst.ElementFactory.make("rtpamrdepay", "rtpamrdepay")
avdec_h264 = gst.ElementFactory.make("avdec_h264", "avdec_h264")
videoconvert = gst.ElementFactory.make("videoconvert", "videoconvert")
avdec_amrwb = gst.ElementFactory.make("avdec_amrwb", "avdec_amrwb")
audioconvert = gst.ElementFactory.make("audioconvert", "audioconvert")
progressreport = gst.ElementFactory.make("progressreport", "progressreport")
#File
x264enc = gst.ElementFactory.make("x264enc", "x264enc")
voamrwbenc = gst.ElementFactory.make("voamrwbenc", "voamrwbenc")
qtmux = gst.ElementFactory.make("qtmux", "qtmux")
filesink = gst.ElementFactory.make("filesink", "filesink")
filesink.set_property("location", "<dir>/Captured.3gp")
pipeline.add(rtpbin)
pipeline.add(vudpsrc_rtpin)
pipeline.add(vudpsrc_rtcpin)
pipeline.add(vudpsink_rtcpout)
pipeline.add(audpsrc_rtpin)
pipeline.add(audpsrc_rtcpin)
pipeline.add(audpsink_rtcpout)
pipeline.add(rtph264depay)
pipeline.add(rtpamrdepay)
pipeline.add(avdec_h264)
pipeline.add(avdec_amrwb)
pipeline.add(videoconvert)
pipeline.add(audioconvert)
pipeline.add(progressreport)
#File
pipeline.add(x264enc)
pipeline.add(voamrwbenc)
pipeline.add(qtmux)
pipeline.add(filesink)
def pad_added_cb(rtpbin, pad, depay):
#print "Pad Added entry. Template: "
+str(pad.get_property("template").name_template)
#print "Pad name: " +str(pad.get_name())
#print "Depay name: "+str(depay.get_name())
if pad.get_name().startswith("recv_rtp_src_0") and
depay.get_name().startswith("rtph264depay"):
print "Linking video depay"
depay_pad = rtph264depay.get_static_pad("sink")
pad.link(depay_pad)
elif pad.get_name().startswith("recv_rtp_src_1") and
depay.get_name().startswith("rtpamrdepay"):
print "Linking audio depay"
depay_pad = rtpamrdepay.get_static_pad("sink")
pad.link(depay_pad)
#else:
# print "Can not do anything."
print "Linking static pads."
vudpsrc_rtpin.link_pads('src', rtpbin, 'recv_rtp_sink_0')
vudpsrc_rtcpin.link_pads('src', rtpbin, 'recv_rtcp_sink_0')
rtpbin.link_pads('send_rtcp_src_0', vudpsink_rtcpout, 'sink')
audpsrc_rtpin.link_pads('src', rtpbin, 'recv_rtp_sink_1')
audpsrc_rtcpin.link_pads('src', rtpbin, 'recv_rtcp_sink_1')
rtpbin.link_pads('send_rtcp_src_1', audpsink_rtcpout, 'sink')
rtpbin.connect('pad-added', pad_added_cb, rtph264depay)
rtpbin.connect('pad-added', pad_added_cb, rtpamrdepay)
rtph264depay.link(avdec_h264)
avdec_h264.link(videoconvert)
rtpamrdepay.link(avdec_amrwb)
avdec_amrwb.link(audioconvert)
#File
videoconvert.link(x264enc)
x264enc.link_pads('src', qtmux, 'video_0')
audioconvert.link(progressreport)
progressreport.link(voamrwbenc)
voamrwbenc.link_pads('src', qtmux, 'audio_0')
qtmux.link(filesink)
def on_message(bus, message):
# print "On_Message entry."
t = message.type
if t == gst.MessageType.EOS:
print "EOS Message received ###################."
eos()
elif t == gst.MessageType.ERROR:
err, debug = message.parse_error()
print "Error: %s" % err, debug
pipeline.set_state(gst.State.NULL)
# else:
# print "Else Message: "+str(t)
def on_sync_message(bus, message):
print "On_Sync_Message entry."
def on_eos(bus, msg):
print "On_Eos."
eos()
def on_error(bus, msg):
err, debug = msg.parse_error()
print "Error: %s" % err, debug
def on_warning(bus, msg):
print "Warning."
def on_state(bus, msg):
print "State Changed."
print "Creating the bus."
bus.enable_sync_message_emission()
#bus.connect("message", on_message)
bus.connect('message::eos', on_eos)
bus.connect('message::error', on_error)
bus.connect('message::warning', on_warning)
#bus.connect('message::state-changed', on_state)
bus.connect("sync-message::element", on_sync_message)
bus.add_signal_watch()
def start():
pipeline.set_state(gst.State.PLAYING)
vudpsink_rtcpout.set_locked_state(gst.State.PLAYING)
audpsink_rtcpout.set_locked_state(gst.State.PLAYING)
print "Started..."
def eos():
print "Sending EOS to pipeline."
print pipeline.send_event(gst.Event.new_eos())
print "Changing pipeline state to NULL."
print pipeline.set_state(gst.State.NULL)
print "Quitting the Mainloop."
print mainloop.quit()
def loop():
try:
print "Running the Mainloop."
mainloop.run()
except KeyboardInterrupt:
eos()
if __name__ == '__main__':
start()
loop()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.freedesktop.org/archives/gstreamer-devel/attachments/20151210/9d04855d/attachment-0001.html>
More information about the gstreamer-devel
mailing list