gstreamer python binding memory leak

neil yannian89 at 163.com
Mon Oct 21 07:18:29 UTC 2019


I encounter a memory leak problem while start and stop gstreamer pipeline  in
my project. Then I reproduced the bug using the following code snippet.

I found that the finalized function of the pipeline and elements had not
been called after setting the pipeline to NULL.

could you give me some sugguestions to debug, thanks


#!/usr/bin/env python3

import sys
import gi
import time
import objgraph
gi.require_version('Gst', '1.0')
import os
import psutil
from gi.repository import Gst, GObject
import threading
from memory_profiler import profile
import logging
from collections import defaultdict
import weakref

class KeepRefs(object):
    __refs__ = defaultdict(list)
    def __init__(self):
        self.__refs__[self.__class__].append(weakref.ref(self))

    @classmethod
    def get_instances(cls):
        for inst_ref in cls.__refs__[cls]:
            inst = inst_ref()
            if inst is not None:
                yield inst

#
http://docs.gstreamer.com/display/GstSDK/Basic+tutorial+3%3A+Dynamic+pipelines


class Player(KeepRefs):

    def __init__(self, name):
        super(Player, self).__init__()
        self.name = name
        self.id = name
        self.start_time = time.time()
        self.status = 'created'

        # create empty pipeline
        self.pipeline = Gst.Pipeline.new("test-pipeline")
        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect('message::error', self.on_error)
        self.bus.connect('message::eos', self.on_eos)
        self.bus.connect('message::state-changed', self.on_state_changed)

        # create the elements
        self.source = Gst.ElementFactory.make("filesrc", "source")
        self.demux = Gst.ElementFactory.make("qtdemux", "demux")
        self.h264parse = Gst.ElementFactory.make("h264parse", "h264parse")
        self.mux = Gst.ElementFactory.make('flvmux', 'mux')
        self.sink = Gst.ElementFactory.make("rtmpsink", "sink")


        # def weak_ref_cb(data):
        #     print('c object finalized', data)
        # self.pipeline.weak_ref(weak_ref_cb, 'pipeline')
        # self.source.weak_ref(weak_ref_cb, 'source')
        # self.convert.weak_ref(weak_ref_cb, 'source')
        # self.sink.weak_ref(weak_ref_cb, 'sink')

        if not self.pipeline or \
                not self.source or \
                not self.demux or \
                not self.h264parse or \
                not self.mux or \
                not self.sink:
            logging.debug("ERROR: Could not create all elements")
            sys.exit(1)


        # build the pipeline. we are NOT linking the source at this point.
        # will do it later
        self.pipeline.add(self.source)
        self.pipeline.add(self.demux)
        self.pipeline.add(self.h264parse)
        self.pipeline.add(self.mux)
        self.pipeline.add(self.sink)
        if not self.source.link(self.demux) or \
            not self.h264parse.link(self.mux) or \
                not self.mux.link(self.sink):
            logging.debug("ERROR: Could not link 'convert' to 'sink'")
            sys.exit(1)
        #
        # # set the URI to play
        self.source.set_property('location', '/home/neil/Videos/car.mp4')
        self.sink.set_property('location',
'rtmp://172.16.203.178:1935/preview/video')

        # connect to the pad-added signal
        self.demux.connect_data("pad-added", self.on_pad_added,
self.h264parse)

        # start playing
        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            logging.debug("ERROR: Unable to set the pipeline to the playing
state")
            sys.exit(1)
        time.sleep(3)

    def stop_pipeline(self):
        pl.pipeline.set_state(Gst.State.NULL)
        self.bus.remove_signal_watch()
        self.source.unlink(self.demux)
        self.demux.unlink(self.h264parse)
        self.h264parse.unlink(self.mux)
        self.mux.unlink(self.sink)
        self.pipeline.remove(self.source)
        self.pipeline.remove(self.demux)
        self.pipeline.remove(self.h264parse)
        self.pipeline.remove(self.mux)
        self.pipeline.remove(self.sink)
        time.sleep(5)

    def on_eos(self, bus, msg):
        logging.info('receive eos message')

    # ref:
https://gstreamer.freedesktop.org/documentation/design/messages.html?gi-language=c#message-types
    # Receiving GST_MESSAGE_ERROR usually means that part of the pipeline is
not streaming anymore, so stop
    # the stream and let plmgr restart the instance
    def on_error(self, bus, msg):
        logging.error("PIPELINE[%s] has an error, ERROR:%s" % (self.id,
msg.parse_error()))

    def on_state_changed(self, bus, msg):
        old, new, pending = msg.parse_state_changed()
        if not msg.src == self.pipeline:
            return

        print('PIPELINE[{}] status change[{} -> {}]'.format(self.id, old,
new))

    # handler for the pad-added signal
    def on_pad_added(self, src, new_pad, sink):
        sink_pad = sink.get_static_pad("sink")
        print(
            "Received new pad '{0:s}' from '{1:s}'".format(
                new_pad.get_name(),
                src.get_name()))

        # if our converter is already linked, we have nothing to do here
        if(sink_pad.is_linked()):
            print("We are already linked. Ignoring.")
            return

        # check the new pad's type
        new_pad_caps = new_pad.get_current_caps()
        new_pad_struct = new_pad_caps.get_structure(0)
        new_pad_type = new_pad_struct.get_name()

        if not new_pad_type.startswith("video/x-h264"):
            print("It has type '{0:s}' which is not raw audio.
Ignoring.".format(
                new_pad_type))
            return

        # attempt the link
        ret = new_pad.link(sink_pad)
        if not ret == Gst.PadLinkReturn.OK:
            logging.debug("Type is '{0:s}}' but link
failed".format(new_pad_type))
        else:
            logging.debug("Link succeeded (type
'{0:s}')".format(new_pad_type))

        return


# @profile
def create_player(name):
    p = Player(name)
    time.sleep(15)
    return p


# @profile
def stop_player(p):
    p.stop_pipeline()
    time.sleep(5)


def get_memory():
    p = psutil.Process(os.getpid())
    return p.memory_info().rss / 1024 / 1024.0


if __name__ == '__main__':
    Gst.init(None)
    GObject.threads_init()
    for i in range(100):
        print(i, 'memory before start', get_memory(), 'MB')
        pl = create_player(i)
        print(i, 'memory after start', get_memory(), 'MB')
        stop_player(pl)
        print(i, 'memory after stop', get_memory(), 'MB')



--
Sent from: http://gstreamer-devel.966125.n4.nabble.com/


More information about the gstreamer-devel mailing list