Restart Pipeline

WisdomPill anas.el.amraoui at live.com
Wed Jan 10 12:36:17 UTC 2018


I solved the issue adding the pad-added signal to the rtspsrc and unlinking
rtspsrc from the rtpjitterbuffer.
My code is the following, but it's Python.

import logging
import gi
from threading import Thread, Lock
from time import sleep

from lib.utils import get_pad_info

gi.require_version('GstVideo', '1.0')
gi.require_version('Gst', '1.0')
gi.require_version('Gtk', '3.0')

# GstVideo is needed for set_window_handle(), dont't know why pycharm
removes it
from gi.repository import Gst, Gtk, Gdk, Pango, GstVideo, GObject


class GstWidget(Gtk.DrawingArea):
    def __init__(self, stream, parent, conf):
        super(GstWidget, self).__init__()
        self.stream = stream
        label_height = conf.label_height
        restore_pipeline_period = conf.restore_pipeline_period

        self.location = 'rtsp://{}:{}/{}'.format(stream.host, stream.port,
stream.path)

        self.log = logging.getLogger('visualizer')

        self.log.info('[{}] Initializing stream label of
{}'.format(stream.name, stream.name))
        self.label = Gtk.Label()
        stream_rectangle = stream.rectangle
        self.label.set_size_request(stream_rectangle.width, label_height)
        self.label.override_background_color(Gtk.StateType.NORMAL,
Gdk.RGBA(1.0, 1.0, 1.0, 1.0))
       
self.label.modify_font(Pango.font_description_from_string('font-size: 20'))
        parent.put(self.label, stream.rectangle.x, stream.rectangle.y +
stream_rectangle.height)
        self.log.info('[{}] Done label!'.format(stream.name))

        stream_rectangle = self.stream.rectangle
        self.set_size_request(stream_rectangle.width,
stream_rectangle.height)

        self.stream_connection_thread = None
        self.reconnecting = False
        self.lock = Lock()
        self.closed = False

        self.player = Gst.Pipeline.new('player')
        source = Gst.ElementFactory.make('rtspsrc', 'source')
        source.set_property('location', self.location)
        source.connect('pad-added', self.on_pad_added)
        jitter = Gst.ElementFactory.make('rtpjitterbuffer', 'jitter')
        depayer = Gst.ElementFactory.make('rtph264depay', 'depayer')
        decoder = Gst.ElementFactory.make('avdec_h264', 'decoder')
        converter = Gst.ElementFactory.make('videoconvert', 'converter')
        if conf.timers_overlay:
            timeoverlay = Gst.ElementFactory.make('timeoverlay',
'timeoverlay')
            clockoverlay = Gst.ElementFactory.make('clockoverlay',
'clockoverlay')
            clockoverlay.set_property('halignment', 'right')
        sink = Gst.ElementFactory.make('autovideosink', 'sink')
        sink.set_property('sync', False)

        self.player.add(source)
        self.player.add(jitter)
        self.player.add(depayer)
        self.player.add(decoder)
        self.player.add(converter)
        if conf.timers_overlay:
            self.player.add(timeoverlay)
            self.player.add(clockoverlay)
        self.player.add(sink)

        source.link(jitter)
        jitter.link(depayer)
        depayer.link(decoder)
        if conf.timers_overlay:
            decoder.link(timeoverlay)
            timeoverlay.link(clockoverlay)
            clockoverlay.link(converter)
        else:
            decoder.link(converter)
        converter.link(sink)

        self.bus = self.player.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect('message::warning', self.on_warning)
        self.bus.connect('message::error', self.on_error)
        self.bus.connect('message::state-changed', self.on_status_changed)
        self.bus.connect('message::eos', self.on_eos)
        self.bus.enable_sync_message_emission()
        self.bus.connect('sync-message::element', self.set_frame_handle)
        self.restore_pipeline()
        if conf.restore_pipeline_period != 0:
            GObject.timeout_add_seconds(restore_pipeline_period * 60,
self.restore_pipeline)

    def set_online_label(self):
        self.log.info('[{}] {} is online'.format(self.stream.name,
self.stream.name))
        self.label.set_text(self.stream.name)

    def set_offline_label(self):
        self.log.info('[{}] {} is offline'.format(self.stream.name,
self.stream.name))
        self.label.set_text('{} is offline'.format(self.stream.name))

    def on_status_changed(self, bus, message):
        msg = message.parse_state_changed()
        self.log.info('[{}] status_changed message ->
{}'.format(self.stream.name, msg))
        if msg.newstate == Gst.State.PLAYING:
            self.set_online_label()
        else:
            self.set_offline_label()

    def send_eos(self):
        self.log.debug('[{}] Sending eos'.format(self.stream.name))
        self.player.send_event(Gst.Event.new_eos())

    def on_eos(self, bus, message):
        self.log.warning('[{}] eos message -> {}'.format(self.stream.name,
message))
        self.restore_pipeline()

    def on_warning(self, bus, message):
        self.log.warning('[{}] warning message ->
{}'.format(self.stream.name, message.parse_warning()))
        self.restore_pipeline()

    def on_error(self, bus, message):
        self.log.error('[{}] error message -> {}'.format(self.stream.name,
message.parse_error()))
        self.restore_pipeline()

    def activate_reconnection(self):
        connection_thread = self.stream_connection_thread
        self.log.warning('[{}] connection thread {}, player.current_state
{}'.format(self.stream.name, connection_thread, self.player.current_state))
        if self.player.current_state == Gst.State.NULL and \
                (connection_thread is None or (connection_thread and not
connection_thread.is_alive())):
            self.stream_connection_thread =
Thread(target=self.reconnect_stream)
            self.stream_connection_thread.start()
        self.log.warning('[{}] I supposedly lost the
connection!'.format(self.stream.name))

    def reconnect_stream(self):
        self.log.info('[{}] reconnecting the
stream!'.format(self.stream.name))
        while self.player.current_state != Gst.State.PLAYING:
            # Applied De Morgan rules !playing && !closed => !(playing or
closed)
            self.log.info('[{}] I\'m trying to
reconnect!'.format(self.stream.name))
            self.play()
            sleep(self.stream.sleep_time)
            self.log.info('[{}] Just woke up'.format(self.stream.name))

    def stop(self):
        self.log.info('[{}] Current state of my pipeline is
{}'.format(self.stream.name, self.player.current_state))
        self.log.info('[{}] setting pipeline state to
null'.format(self.stream.name))
        self.set_player_state(Gst.State.NULL)

    def play(self):
        self.log.info('[{}] Current state of my pipeline is
{}'.format(self.stream.name, self.player.current_state))
        self.log.info('[{}] setting pipeline state to
playing'.format(self.stream.name))
        outcome = self.set_player_state(Gst.State.PLAYING)
        if outcome == Gst.StateChangeReturn.ASYNC:
            self.log.warning('[{}] Got ASYNC state change in
return'.format(self.stream.name))

    def restore_pipeline(self):
        self.log.info('[{}] asking for the lock'.format(self.stream.name))
        self.lock.acquire()
        if not self.reconnecting:
            self.reconnecting = True
        else:
            self.lock.release()
            self.log.info('[{}] released the lock'.format(self.stream.name))
            return True
        self.lock.release()
        self.log.info('[{}] released the lock'.format(self.stream.name))

        while self.player.current_state != Gst.State.NULL:
            self.stop()
        self.set_offline_label()

        source = self.player.get_child_by_name('source')
        jitter = self.player.get_child_by_name('jitter')
        ret = source.unlink(jitter)
        self.log.info('[{}] unlinking source to jitter got me
{}'.format(self.stream.name, ret))

        self.activate_reconnection()

        self.log.info('[{}] asking for the lock'.format(self.stream.name))
        self.lock.acquire()
        self.reconnecting = False
        self.lock.release()
        self.log.info('[{}] released the lock'.format(self.stream.name))
        return True

    def on_pad_added(self, element, pad):
        self.log.warning('[{}] pad-added to {}'.format(self.stream.name,
pad.name))
        jitter = self.player.get_child_by_name('jitter')
        jitter_pad = jitter.get_static_pad('sink')
        retval = pad.link(jitter_pad)
        self.log.warning('[{}] linked pad got me
{}'.format(self.stream.name, retval))

    def set_player_state(self, state):
        outcome = self.player.set_state(state)
        self.log.info('[{}] Called status change to {} got me
{}'.format(self.stream.name, state, outcome))
        return outcome

    def close(self):
        self.closed = True
        self.player.set_state(Gst.State.NULL)

    def status(self):
        response = {
            'player_state': self.player.current_state.value_nick,
            'label': self.label.get_text()
        }

        recon_thread = self.stream_connection_thread

        if recon_thread:
            response['reconnection_thread'] = {
                'alive': recon_thread.is_alive(),
                'name': recon_thread.name,
                'daemon': recon_thread.daemon
            }

        pipeline = self.player

        response['pipeline'] = {
            'pads': {
                pad.name: get_pad_info(pad) for pad in pipeline.pads
            }
        }

        clock = pipeline.clock
        if clock:
            response['pipeline']['clock'] = {
                'time': clock.get_time(),
                'resolution': clock.get_resolution(),
                'timeout': clock.get_timeout(),
                'internal time': clock.get_internal_time(),
                'floating': clock.is_floating(),
                'synced': clock.is_synced(),
                'name': clock.get_name(),
                'calibration': clock.get_calibration()
            }

        response['elements'] = [
            {
                'name': child.name,
                'state': child.current_state.value_nick,
                'flags': child.flags,
                'pads': {
                    pad.name: get_pad_info(pad) for pad in child.pads
                }
            }
            for child in pipeline.children]

        return response

    def set_frame_handle(self, bus, message):
        if message.get_structure().get_name() == 'prepare-window-handle':
            frame = message.src
            frame.set_property('force-aspect-ratio', True)
            Gdk.threads_enter()
            frame.set_window_handle(self.get_window().get_xid())
            Gdk.threads_leave()





--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/


More information about the gstreamer-devel mailing list