<div dir="ltr">Hello!<div>I'm trying to destroy pipeline by setting NULL state in error message handler, but there is two sockets leak in each turn...</div><div>Am I wrong and why?<br><div><br></div><div>Code:</div><div>--</div><div><div>#!/usr/bin/python</div><div># -*- encoding: utf-8 -*-</div><div><br></div><div>import sys, os, errno, socket, time#!/usr/bin/python</div><div># -*- encoding: utf-8 -*-</div><div><br></div><div>import sys, os, errno, socket, time</div><div><br></div><div>import gi</div><div>gi.require_version('Gst', '1.0')</div><div>from gi.repository import Gst, GObject</div><div><br></div><div>Gst.CLOCK_TIME_NONE = 2**64-1 # Fix, g-i (GstClockTime (-1))</div><div><br></div><div>def DEBUG(*x):</div><div><span class="" style="white-space:pre"> </span>print >>sys.stderr, "DEBUG:", ", ".join(str(e) for e in x)</div><div><br></div><div>class Leak():</div><div>    pipe = """</div><div>                 filesrc location=/tmp/leak-.mkv !</div><div>                 filesink location=/tmp/leak+.mkv</div><div><br></div><div>"""</div><div>    def __init__(self):</div><div>        self.build()</div><div><br></div><div>    def build(self):</div><div><span class="" style="white-space:pre">  </span>self.pipeline = Gst.parse_launch(self.pipe)</div><div><span class="" style="white-space:pre">        </span>DEBUG("NEW pipeline: %x" % id(self.pipeline))</div><div><br></div><div>        def onMessage(bus, message, self):</div><div>            if message.type == Gst.MessageType.ERROR:</div><div>                err, msg = message.parse_error()</div><div>                print >> sys.stderr, err.message</div><div><span class="" style="white-space:pre">                </span>self.shutdown()</div><div><span class="" style="white-space:pre">            </span>return False</div><div><br></div><div>            if message.type == Gst.MessageType.EOS:</div><div>                self.pipeline.set_state(Gst.State.NULL)</div><div>            <span class="" style="white-space:pre">      </span>DEBUG("onMessage", message.type)</div><div><br></div><div>            DEBUG("onMessage", message.type)</div><div><br></div><div>            return True</div><div>                </div><div><span class="" style="white-space:pre">        </span>self.pipeline.set_auto_flush_bus(True)</div><div>        bus = self.pipeline.get_bus()</div><div>        bus.connect("message", onMessage, self)</div><div>        bus.add_signal_watch()</div><div><br></div><div>    def play(self):</div><div>        if not self.pipeline:</div><div><span class="" style="white-space:pre">           </span>self.build()</div><div><br></div><div><span class="" style="white-space:pre">      </span>print self.pipeline.get_state(0)</div><div>        self.pipeline.set_state(Gst.State.PLAYING)</div><div>        return True</div><div><span class="" style="white-space:pre">        </span></div><div>    def shutdown(self):</div><div><span class="" style="white-space:pre">   </span>#leak.pipeline.set_state(Gst.State.READY)</div><div><span class="" style="white-space:pre">  </span>leak.pipeline.set_state(Gst.State.NULL)</div><div><span class="" style="white-space:pre">    </span>leak.pipeline = None</div><div><span class="" style="white-space:pre">       </span>DEBUG("ERROR, pipeline destroyed")</div><div><br></div><div>GObject.threads_init()</div><div>loop = GObject.MainLoop()</div><div>Gst.init(None)</div><div><br></div><div>leak = Leak()</div><div>leak.play()</div><div><br></div><div>GObject.timeout_add(8000, Leak.play, leak)</div><div><br></div><div>try:</div><div>  loop.run()</div><div>except KeyboardInterrupt as e:</div><div>   pass</div><div>finally:</div><div>  leak.shutdown()</div><div><br></div><div>import gi</div><div>gi.require_version('Gst', '1.0')</div><div>from gi.repository import Gst, GObject</div><div><br></div><div>Gst.CLOCK_TIME_NONE = 2**64-1 # Fix, g-i (GstClockTime (-1))</div><div><br></div><div>def DEBUG(*x):</div><div><span class="" style="white-space:pre">       </span>print >>sys.stderr, "DEBUG:", ", ".join(str(e) for e in x)</div><div><br></div><div>class Leak():</div><div>    pipe = """</div><div>                 filesrc location=/tmp/leak-.mkv !</div><div>                 filesink location=/tmp/leak+.mkv</div><div><br></div><div>"""</div><div>    def __init__(self):</div><div>        self.build()</div><div><br></div><div>    def build(self):</div><div><span class="" style="white-space:pre">  </span>self.pipeline = Gst.parse_launch(self.pipe)</div><div><span class="" style="white-space:pre">        </span>DEBUG("NEW pipeline: %x" % id(self.pipeline))</div><div><br></div><div>        def onMessage(bus, message, self):</div><div>            if message.type == Gst.MessageType.ERROR:</div><div>                err, msg = message.parse_error()</div><div>                print >> sys.stderr, err.message</div><div><span class="" style="white-space:pre">                </span>self.shutdown()</div><div><span class="" style="white-space:pre">            </span>return False</div><div><br></div><div>            if message.type == Gst.MessageType.EOS:</div><div>                self.pipeline.set_state(Gst.State.NULL)</div><div>            <span class="" style="white-space:pre">      </span>DEBUG("onMessage", message.type)</div><div><br></div><div>            DEBUG("onMessage", message.type)</div><div><br></div><div>            return True</div><div>                </div><div><span class="" style="white-space:pre">        </span>self.pipeline.set_auto_flush_bus(True)</div><div>        bus = self.pipeline.get_bus()</div><div>        bus.connect("message", onMessage, self)</div><div>        bus.add_signal_watch()</div><div><br></div><div>    def play(self):</div><div>        if not self.pipeline:</div><div><span class="" style="white-space:pre">           </span>self.build()</div><div><br></div><div><span class="" style="white-space:pre">      </span>print self.pipeline.get_state(0)</div><div>        self.pipeline.set_state(Gst.State.PLAYING)</div><div>        return True</div><div><span class="" style="white-space:pre">        </span></div><div>    def shutdown(self):</div><div><span class="" style="white-space:pre">   </span>#leak.pipeline.set_state(Gst.State.READY)</div><div><span class="" style="white-space:pre">  </span>leak.pipeline.set_state(Gst.State.NULL)</div><div><span class="" style="white-space:pre">    </span>leak.pipeline = None</div><div><span class="" style="white-space:pre">       </span>DEBUG("ERROR, pipeline destroyed")</div><div><br></div><div>GObject.threads_init()</div><div>loop = GObject.MainLoop()</div><div>Gst.init(None)</div><div><br></div><div>leak = Leak()</div><div>leak.play()</div><div><br></div><div>GObject.timeout_add(8000, Leak.play, leak)</div><div><br></div><div>try:</div><div>  loop.run()</div><div>except KeyboardInterrupt as e:</div><div>   pass</div><div>finally:</div><div>  leak.shutdown()</div><div>--</div></div></div></div>