Restart Pipeline
WisdomPill
anas.el.amraoui at live.com
Wed Jan 10 12:36:17 UTC 2018
I solved the issue adding the pad-added signal to the rtspsrc and unlinking
rtspsrc from the rtpjitterbuffer.
My code is the following, but it's Python.
import logging
import gi
from threading import Thread, Lock
from time import sleep
from lib.utils import get_pad_info
gi.require_version('GstVideo', '1.0')
gi.require_version('Gst', '1.0')
gi.require_version('Gtk', '3.0')
# GstVideo is needed for set_window_handle(), dont't know why pycharm
removes it
from gi.repository import Gst, Gtk, Gdk, Pango, GstVideo, GObject
class GstWidget(Gtk.DrawingArea):
def __init__(self, stream, parent, conf):
super(GstWidget, self).__init__()
self.stream = stream
label_height = conf.label_height
restore_pipeline_period = conf.restore_pipeline_period
self.location = 'rtsp://{}:{}/{}'.format(stream.host, stream.port,
stream.path)
self.log = logging.getLogger('visualizer')
self.log.info('[{}] Initializing stream label of
{}'.format(stream.name, stream.name))
self.label = Gtk.Label()
stream_rectangle = stream.rectangle
self.label.set_size_request(stream_rectangle.width, label_height)
self.label.override_background_color(Gtk.StateType.NORMAL,
Gdk.RGBA(1.0, 1.0, 1.0, 1.0))
self.label.modify_font(Pango.font_description_from_string('font-size: 20'))
parent.put(self.label, stream.rectangle.x, stream.rectangle.y +
stream_rectangle.height)
self.log.info('[{}] Done label!'.format(stream.name))
stream_rectangle = self.stream.rectangle
self.set_size_request(stream_rectangle.width,
stream_rectangle.height)
self.stream_connection_thread = None
self.reconnecting = False
self.lock = Lock()
self.closed = False
self.player = Gst.Pipeline.new('player')
source = Gst.ElementFactory.make('rtspsrc', 'source')
source.set_property('location', self.location)
source.connect('pad-added', self.on_pad_added)
jitter = Gst.ElementFactory.make('rtpjitterbuffer', 'jitter')
depayer = Gst.ElementFactory.make('rtph264depay', 'depayer')
decoder = Gst.ElementFactory.make('avdec_h264', 'decoder')
converter = Gst.ElementFactory.make('videoconvert', 'converter')
if conf.timers_overlay:
timeoverlay = Gst.ElementFactory.make('timeoverlay',
'timeoverlay')
clockoverlay = Gst.ElementFactory.make('clockoverlay',
'clockoverlay')
clockoverlay.set_property('halignment', 'right')
sink = Gst.ElementFactory.make('autovideosink', 'sink')
sink.set_property('sync', False)
self.player.add(source)
self.player.add(jitter)
self.player.add(depayer)
self.player.add(decoder)
self.player.add(converter)
if conf.timers_overlay:
self.player.add(timeoverlay)
self.player.add(clockoverlay)
self.player.add(sink)
source.link(jitter)
jitter.link(depayer)
depayer.link(decoder)
if conf.timers_overlay:
decoder.link(timeoverlay)
timeoverlay.link(clockoverlay)
clockoverlay.link(converter)
else:
decoder.link(converter)
converter.link(sink)
self.bus = self.player.get_bus()
self.bus.add_signal_watch()
self.bus.connect('message::warning', self.on_warning)
self.bus.connect('message::error', self.on_error)
self.bus.connect('message::state-changed', self.on_status_changed)
self.bus.connect('message::eos', self.on_eos)
self.bus.enable_sync_message_emission()
self.bus.connect('sync-message::element', self.set_frame_handle)
self.restore_pipeline()
if conf.restore_pipeline_period != 0:
GObject.timeout_add_seconds(restore_pipeline_period * 60,
self.restore_pipeline)
def set_online_label(self):
self.log.info('[{}] {} is online'.format(self.stream.name,
self.stream.name))
self.label.set_text(self.stream.name)
def set_offline_label(self):
self.log.info('[{}] {} is offline'.format(self.stream.name,
self.stream.name))
self.label.set_text('{} is offline'.format(self.stream.name))
def on_status_changed(self, bus, message):
msg = message.parse_state_changed()
self.log.info('[{}] status_changed message ->
{}'.format(self.stream.name, msg))
if msg.newstate == Gst.State.PLAYING:
self.set_online_label()
else:
self.set_offline_label()
def send_eos(self):
self.log.debug('[{}] Sending eos'.format(self.stream.name))
self.player.send_event(Gst.Event.new_eos())
def on_eos(self, bus, message):
self.log.warning('[{}] eos message -> {}'.format(self.stream.name,
message))
self.restore_pipeline()
def on_warning(self, bus, message):
self.log.warning('[{}] warning message ->
{}'.format(self.stream.name, message.parse_warning()))
self.restore_pipeline()
def on_error(self, bus, message):
self.log.error('[{}] error message -> {}'.format(self.stream.name,
message.parse_error()))
self.restore_pipeline()
def activate_reconnection(self):
connection_thread = self.stream_connection_thread
self.log.warning('[{}] connection thread {}, player.current_state
{}'.format(self.stream.name, connection_thread, self.player.current_state))
if self.player.current_state == Gst.State.NULL and \
(connection_thread is None or (connection_thread and not
connection_thread.is_alive())):
self.stream_connection_thread =
Thread(target=self.reconnect_stream)
self.stream_connection_thread.start()
self.log.warning('[{}] I supposedly lost the
connection!'.format(self.stream.name))
def reconnect_stream(self):
self.log.info('[{}] reconnecting the
stream!'.format(self.stream.name))
while self.player.current_state != Gst.State.PLAYING:
# Applied De Morgan rules !playing && !closed => !(playing or
closed)
self.log.info('[{}] I\'m trying to
reconnect!'.format(self.stream.name))
self.play()
sleep(self.stream.sleep_time)
self.log.info('[{}] Just woke up'.format(self.stream.name))
def stop(self):
self.log.info('[{}] Current state of my pipeline is
{}'.format(self.stream.name, self.player.current_state))
self.log.info('[{}] setting pipeline state to
null'.format(self.stream.name))
self.set_player_state(Gst.State.NULL)
def play(self):
self.log.info('[{}] Current state of my pipeline is
{}'.format(self.stream.name, self.player.current_state))
self.log.info('[{}] setting pipeline state to
playing'.format(self.stream.name))
outcome = self.set_player_state(Gst.State.PLAYING)
if outcome == Gst.StateChangeReturn.ASYNC:
self.log.warning('[{}] Got ASYNC state change in
return'.format(self.stream.name))
def restore_pipeline(self):
self.log.info('[{}] asking for the lock'.format(self.stream.name))
self.lock.acquire()
if not self.reconnecting:
self.reconnecting = True
else:
self.lock.release()
self.log.info('[{}] released the lock'.format(self.stream.name))
return True
self.lock.release()
self.log.info('[{}] released the lock'.format(self.stream.name))
while self.player.current_state != Gst.State.NULL:
self.stop()
self.set_offline_label()
source = self.player.get_child_by_name('source')
jitter = self.player.get_child_by_name('jitter')
ret = source.unlink(jitter)
self.log.info('[{}] unlinking source to jitter got me
{}'.format(self.stream.name, ret))
self.activate_reconnection()
self.log.info('[{}] asking for the lock'.format(self.stream.name))
self.lock.acquire()
self.reconnecting = False
self.lock.release()
self.log.info('[{}] released the lock'.format(self.stream.name))
return True
def on_pad_added(self, element, pad):
self.log.warning('[{}] pad-added to {}'.format(self.stream.name,
pad.name))
jitter = self.player.get_child_by_name('jitter')
jitter_pad = jitter.get_static_pad('sink')
retval = pad.link(jitter_pad)
self.log.warning('[{}] linked pad got me
{}'.format(self.stream.name, retval))
def set_player_state(self, state):
outcome = self.player.set_state(state)
self.log.info('[{}] Called status change to {} got me
{}'.format(self.stream.name, state, outcome))
return outcome
def close(self):
self.closed = True
self.player.set_state(Gst.State.NULL)
def status(self):
response = {
'player_state': self.player.current_state.value_nick,
'label': self.label.get_text()
}
recon_thread = self.stream_connection_thread
if recon_thread:
response['reconnection_thread'] = {
'alive': recon_thread.is_alive(),
'name': recon_thread.name,
'daemon': recon_thread.daemon
}
pipeline = self.player
response['pipeline'] = {
'pads': {
pad.name: get_pad_info(pad) for pad in pipeline.pads
}
}
clock = pipeline.clock
if clock:
response['pipeline']['clock'] = {
'time': clock.get_time(),
'resolution': clock.get_resolution(),
'timeout': clock.get_timeout(),
'internal time': clock.get_internal_time(),
'floating': clock.is_floating(),
'synced': clock.is_synced(),
'name': clock.get_name(),
'calibration': clock.get_calibration()
}
response['elements'] = [
{
'name': child.name,
'state': child.current_state.value_nick,
'flags': child.flags,
'pads': {
pad.name: get_pad_info(pad) for pad in child.pads
}
}
for child in pipeline.children]
return response
def set_frame_handle(self, bus, message):
if message.get_structure().get_name() == 'prepare-window-handle':
frame = message.src
frame.set_property('force-aspect-ratio', True)
Gdk.threads_enter()
frame.set_window_handle(self.get_window().get_xid())
Gdk.threads_leave()
--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/
More information about the gstreamer-devel
mailing list