[Bug 769713] Unable to write multiple simultaneous filesinks from appsrc.

GStreamer (GNOME Bugzilla) bugzilla at gnome.org
Mon Aug 15 18:13:19 UTC 2016


https://bugzilla.gnome.org/show_bug.cgi?id=769713

--- Comment #2 from tyler at pictretail.com ---
If I run the two gst-launch instances on the command line they both work
completely fine. This was the first thing I tested as well.

The multiprocess.Process interface is effectively calling fork.

Right now each multiprocess.Process calls a "gstreamer writer function". Within
each multiprocess we run gst_init(). Maybe that isn't the right way to be doing
this?

Another person suggested to add queues before the h264 encoder and the filesync
process. They said "Queue has two functions: + make a buffer before consuming
element (which encoder is - it needs many frames before it can start encoding)
+ separate further processing into new thread - so that filesink would process
data in new thread." Unfortunately this didn't fix our problem.

Here is the code from the function if that helps at all. And the variables we
pass are:

ring - a ring buffer containing the np.array BGR frames.
reader - a reader that reads from the ring buffer.
stream_params - a class containing stream info like height/width of frame.
exit_event - a multiprocess.Event() flag that is used to send the EOS signal to
the gstreamer process and exit the multiprocess.
appname - the name of the appsrc process
filename - the path/filename of the mp4 file we are writing to file with
gstreamer.


import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib
from gi.repository import Gst as gst

import logging

import pod.framewriter.gstreamer_writer_params as gstreamer_writer_params
import pod.cameras.ringbuffer_camera_streamer as ringbuffer_camera_streamer

logger = logging.getLogger(__name__)


def gstreamer_writer(ring, reader, stream_params, exit_event, appname,
                     filename):
    """ Reads frames from a ring buffer, converts them to h264 encoded mp4 and
        writes the video to file. This function is designed to be called within
        a multiprocessing.Process() subprocess.

        Args:
        -ring: a ring buffer containing frames in BGR format.
        -reader: a reader that reads frames from the ring bufffer.
        -stream_params: a stream paramater object that contains information
        about the video stream.
        -exit_event: a multiprocess.Event() that will be set when the process
        is flagged to end
        -appname: the name for the gstreamer appsrc operation
        -filename: the full path + filename of the movie file that will be
        saved to file.
    """

    # Initialize debugger.
    gst.debug_set_active(True)
    gst.debug_set_default_threshold(1)
    gst.init(None)

    # Initialize streamer to read frames from the ring buffer.
    streamer = (
        ringbuffer_camera_streamer.RingbufferCameraMosaicStreamer(ring,
                                                                  reader,
                                                                 
stream_params))

    # Generate gstreamer command to write BGR bytestream to mp4.
    gstreamer_params = (
        gstreamer_writer_params.GstreamerWriteParameters(stream_params,
                                                         appname, filename))
    command = gstreamer_params.get_pipeline_command()
    command = ' '.join(command)
    print (command)

    # Initialize gstreamer pipeline using generated gstreamer command.
    pipeline = gst.parse_launch(command)
    appsrc = pipeline.get_by_name(appname)

    # Start the main loop.
    mainloop = GLib.MainLoop()

    def need_data(src, need_bytes):
        """ Reads frames from a ring buffer, converts them to h264 encoded mp4
and
            writes the video to file.
        """

        """ NOTE!!!!! eventually streamer with pass a mosaic frame and not a
        frame dict so this code will have to change!
        """
        ts, frame_dict = streamer.get_timestamp_and_frames()
        for feed_name in frame_dict:
            frame = frame_dict[feed_name]

        # If the exit flag is triggered emit end-of-stream message
        if exit_event.is_set():
            logging.info((
                "EXIT_EVENT triggered for reader with ID: " + str(id(reader))))
            src.emit("end-of-stream")
        # otherwise convert incoming BGR frames to bytes and push into buffer
        elif (len(frame) > 0):
            buf = gst.Buffer.new_wrapped(frame.tobytes())
            src.emit("push-buffer", buf)

    """ Adds a listerner for the need-data signal listener that is called
        whenever appsrc needs data
    """
    appsrc.connect("need-data", need_data)

    def on_message(bus, msg):
        """ Handle messages that are passed from the bus. When "exit_event" is
            triggered and the end-of-stream message is read, the mainloop will
            exit and the pipeline will be emptied to allow a clean tear down.

            Args:
            -bus: bus for storing gstreamer messages
            -message: gstreamer message.
        """
        msg_type = msg.type
        print(msg_type.get_name(msg_type))
        if (msg.type == gst.MessageType.EOS or
                msg.type == gst.MessageType.ERROR):
            mainloop.quit()
            pipeline.set_state(gst.State.PAUSED)
            pipeline.set_state(gst.State.READY)
            pipeline.set_state(gst.State.NULL)

    # Initialize a bus for message forwarding
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", on_message)

    # Initalize the pipeline and mainloop
    pipeline.set_state(gst.State.PLAYING)
    mainloop.run()

-- 
You are receiving this mail because:
You are the QA Contact for the bug.
You are the assignee for the bug.


More information about the gstreamer-bugs mailing list