Dynamic Sink Update with gst-python

doruk.snmz doruk898 at gmail.com
Wed Nov 4 08:32:43 UTC 2020


Hi all,

I am currently trying to change sinks on-the-fly with gstreamer-python. I
already have an implementation for the source update and it works like a
charm. However, when it comes to sink update, it is not the same thing as
what is happening with source update. When I run the code for the sink
update below, it displays the output but doesn't save the video. Any help or
idea or hint will be so appreciated! 

*Here is my code for the source update:*

import logging
from threading import Thread, Event

from tools.application_init import application_init

application_init()

from gi.repository import Gst, GLib
from tools.logging_pad_probe import logging_pad_probe
from tools.runner import Runner

log = logging.getLogger("main")

log.info("building pipeline")
pipeline = Gst.Pipeline.new()
caps = Gst.Caps.from_string("video/x-raw,width=640,height=480")

testsrc1 = Gst.ElementFactory.make("v4l2src", "testsrc1")
testsrc1.set_property("device", "/dev/video0")
pipeline.add(testsrc1)

mixer = Gst.ElementFactory.make("videomixer")
pipeline.add(mixer)
testsrc1.link_filtered(mixer, caps)

sink = Gst.ElementFactory.make("nveglglessink")
sink.set_property("sync", False)
pipeline.add(sink)
mixer.link_filtered(sink, caps)

testsrc1.get_static_pad("src").add_probe(
    Gst.PadProbeType.BUFFER, logging_pad_probe, "testsrc1-output")

mixer.get_static_pad("src").add_probe(
    Gst.PadProbeType.BUFFER, logging_pad_probe, "mixer-output")

testsrc2 = None  # (2)
capsfilter2 = None
mixerpad = None

def add_new_src():
    global testsrc2, capsfilter2, mixerpad
    Gst.debug_bin_to_dot_file_with_ts(pipeline, Gst.DebugGraphDetails.ALL,
"adding-testsrc2-before")
    log.info("Adding testsrc2")

    log.info("Creating testsrc2")
    testsrc2 = Gst.ElementFactory.make("v4l2src", "testsrc2")
    testsrc2.set_property("device", "/dev/video1")

    testsrc2.get_static_pad("src").add_probe(
        Gst.PadProbeType.BUFFER, logging_pad_probe, "testsrc2-output")

    log.info("Adding testsrc2")
    log.debug(pipeline.add(testsrc2))

    log.info("Creating capsfilter")
    capsfilter2 = Gst.ElementFactory.make("capsfilter", "capsfilter2")  #
(3)
    capsfilter2.set_property("caps", caps)

    log.info("Adding capsfilter")
    log.debug(pipeline.add(capsfilter2))

    log.info("Linking testsrc2 to capsfilter2")
    log.debug(testsrc2.link(capsfilter2))

    log.info("Requesting Pad from Mixer")
    mixerpad = mixer.get_request_pad("sink_%u")
    log.debug(mixerpad)

    log.info("Linking capsfilter2 to mixerpad")
    log.debug(capsfilter2.get_static_pad("src").link(mixerpad))

    log.info("Syncing Element-States with Pipeline")
    log.debug(capsfilter2.sync_state_with_parent())
    log.debug(testsrc2.sync_state_with_parent())

    log.info("Adding testsrc2 done")
    Gst.debug_bin_to_dot_file_with_ts(pipeline, Gst.DebugGraphDetails.ALL,
"adding-testsrc2-after")  # (4)

def remove_src():
    Gst.debug_bin_to_dot_file_with_ts(pipeline, Gst.DebugGraphDetails.ALL,
"removing-testsrc2-before")
    log.info("Removing testsrc2")

    log.info("Stopping testsrc2")
    log.debug(testsrc2.set_state(Gst.State.NULL))  # (5)

    log.info("Stopping capsfilter2")
    log.debug(capsfilter2.set_state(Gst.State.NULL))

    log.info("Removing testsrc2")
    log.debug(pipeline.remove(testsrc2))

    log.info("Removing capsfilter2")
    log.debug(pipeline.remove(capsfilter2))

    log.info("Releasing mixerpad")
    log.debug(mixer.release_request_pad(mixerpad))  # (6)

    log.info("Removing testsrc2 done")
    Gst.debug_bin_to_dot_file_with_ts(pipeline, Gst.DebugGraphDetails.ALL,
"removing-testsrc2-after")


stop_event = Event()  # (1)

def timed_sequence():
    log.info("Starting Sequence")
    while True:
        if stop_event.wait(2): return
        log.info("Schedule Add Source")
        GLib.idle_add(add_new_src)

        if stop_event.wait(2): return
        log.info("Schedule Remove Source")
        GLib.idle_add(remove_src)


t = Thread(target=timed_sequence, name="Sequence")
t.start()

runner = Runner(pipeline)
runner.run_blocking()

stop_event.set()
t.join()

*And this is the code for the sink update:*

#!/usr/bin/env python
import logging
from threading import Thread, Event

from tools.application_init import application_init

application_init()

from gi.repository import Gst, GLib
from tools.logging_pad_probe import logging_pad_probe
from tools.runner import Runner

log = logging.getLogger("main")

log.info("building pipeline")
pipeline = Gst.Pipeline.new()
bus = pipeline.get_bus()
caps = Gst.Caps.from_string("video/x-raw,width=640,height=480")

testsrc1 = Gst.ElementFactory.make("v4l2src", "testsrc1")
testsrc1.set_property("device", "/dev/video0")
pipeline.add(testsrc1)

mixer = Gst.ElementFactory.make("videomixer")
pipeline.add(mixer)
testsrc1.link_filtered(mixer, caps)

sink1 = Gst.ElementFactory.make("nveglglessink", "testsink1")
sink1.set_property("sync", False)
pipeline.add(sink1)
mixer.link_filtered(sink1, caps)

testsrc1.get_static_pad("src").add_probe(
    Gst.PadProbeType.BUFFER, logging_pad_probe, "testsrc1-output")

mixer.get_static_pad("src").add_probe(
    Gst.PadProbeType.BUFFER, logging_pad_probe, "mixer-output")

#testsrc2 = None  # (2)
sink2 = None  # (2)
mixerpad = None

def add_new_sink():
    global testsink2, mixerpad
    Gst.debug_bin_to_dot_file_with_ts(pipeline, Gst.DebugGraphDetails.ALL,
"adding-sink2-before")
    log.info("Adding sink2")

    log.info("Creating encoder")
    encoder = Gst.ElementFactory.make("omxh264enc", "enc1")

    encoder.get_static_pad("src").add_probe(
        Gst.PadProbeType.BUFFER, logging_pad_probe, "testsrc2-output")

    log.info("Adding encoder")
    log.debug(pipeline.add(encoder))

    log.info("Creating muxer")
    muxer = Gst.ElementFactory.make("mpegtsmux", "mux1")

    log.info("Adding muxer")
    log.debug(pipeline.add(muxer))

    log.info("Creating sink2")
    sink2 = Gst.ElementFactory.make("filesink", "sink2")
    sink2.set_property("location", "output.ts")

    log.info("Adding sink2")
    log.debug(pipeline.add(sink2))

    log.info("Linking videomixer to encoder")
    log.debug(mixer.link(encoder))

    log.info("Requesting Pad from Muxer")
    muxerpad = muxer.get_request_pad("sink_%d")
    log.debug(muxerpad)

    log.info("Linking encoder to muxerpad")
    log.debug(encoder.get_static_pad("src").link(muxerpad))

    log.info("Linking muxer to sink2")
    log.debug(muxer.link(sink2))

    log.info("Syncing Element-States with Pipeline")
    log.debug(encoder.sync_state_with_parent())
    log.debug(muxer.sync_state_with_parent())
    log.debug(sink2.sync_state_with_parent())

    log.info("Adding sink2 done")
    Gst.debug_bin_to_dot_file_with_ts(pipeline, Gst.DebugGraphDetails.ALL,
"adding-testsrc2-after")  # (4)

def remove_sink():
    Gst.debug_bin_to_dot_file_with_ts(pipeline, Gst.DebugGraphDetails.ALL,
"removing-testsrc2-before")
    log.info("Removing encoder")
    log.info("Removing muxer")
    log.info("Removing sink2")

    log.info("Stopping encoder")
    log.debug(encoder.set_state(Gst.State.NULL))  # (5)

    log.info("Stopping muxer")
    log.debug(muxer.set_state(Gst.State.NULL))

    log.info("Stopping sink2")
    log.debug(sink2.set_state(Gst.State.NULL))

    log.info("Removing encoder")
    log.debug(pipeline.remove(encoder))

    log.info("Removing muxer")
    log.debug(pipeline.remove(muxer))

    log.info("Removing sink2")
    log.debug(pipeline.remove(sink2))

    log.info("Releasing mixerpad")
    log.debug(muxer.release_request_pad(muxerpad))  # (6)

    log.info("Removing sink2 done")
    Gst.debug_bin_to_dot_file_with_ts(pipeline, Gst.DebugGraphDetails.ALL,
"removing-testsrc2-after")


stop_event = Event()  # (1)

def timed_sequence():
    log.info("Starting Sequence")
    while True:
        if stop_event.wait(2): return
        log.info("Schedule Add Source")
        GLib.idle_add(add_new_sink)

        if stop_event.wait(2): return
        log.info("Schedule Remove Source")
        GLib.idle_add(remove_sink)


t = Thread(target=timed_sequence, name="Sequence")
t.start()

runner = Runner(pipeline)
runner.run_blocking()
"""
log.info("Sending an EOS event to the pipeline")
pipeline.send_event(Gst.Event.new_eos())
log.info("Waiting for the EOS message on the bus")
bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS)
"""
stop_event.set()
t.join()



--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/


More information about the gstreamer-devel mailing list