Multisocket sink and httpserver: how to terminate connection

Krutskikh Ivan stein.hak at gmail.com
Tue Feb 16 18:12:46 UTC 2016


Hi,
I'm writing a test video distributor tool for serving gstreamer mpeg-ts and
mjpeg streams over http connection. I'm using python basehttpserver and
gstreamer as core parts. I got over initial connection part very fast, I
can actually watch video from a number of vlc players just fine. However
when one of the players disconnect, I get 100% cpu usage for my tool and
socket still open due to some thread locking I think... ( since both
httpserver and handle loop intend to run in foreground). How can I solve
this issue?

I don't now if it's good practice to include source at email nor I don't
now what's the policy on file attachments on this list, so I tried to
minimize the example. This code distributes the test pipeline over http
very well but fails to handle disconnects. Thanks in advance!

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gio, Gst

Gst.init(None)
GObject.threads_init()


class gst_broadcaster:

    def __init__(self):
        self.pipeline = Gst.parse_launch('videotestsrc name=source !
videoconvert ! videorate ! jpegenc name=encode ! multipartmux name=mux !
multisocketsink name=sink')
        self.source = self.pipeline.get_by_name('source')
        self.mux = self.pipeline.get_by_name('mux')
        self.sink = self.pipeline.get_by_name('sink')
        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect('message',self.message_handler)
        self.sink.connect('client-added',self.add_client)
        self.sink.connect('client-socket-removed',self.remove_client)
        self.clients = []
        self.pipeline.set_state(Gst.State.READY)


    def kill(self):
        self.pipeline.set_state(Gst.State.NULL)

    def handles(self):
        return self.sink.get_property('num-handles')

    def add_client(self,sink,gsock):
        print 'Adding client socket:', gsock
        self.pipeline.set_state(Gst.State.PLAYING)
        self.clients.append(gsock)

    def remove_client(self,sink,gsock):
        print 'Removing socket:',gsock
        self.sink.emit('remove-flush',gsock)
        if gsock in self.clients:
            self.clients.remove(gsock)

    def handle(self, wfile):
        client_id = Gio.Socket().new_from_fd(wfile.fileno())
        print 'adding'
        self.sink.emit("add", client_id)
        while client_id in self.clients:
            print 'Waiting'
            time.sleep(1)
            #self.sink.emit("remove-flush", client_id)

    def cleanup(self,client_addr):
        if client_addr in self.clients:
            self.clients.remove(client_addr)

    def message_handler(self,bus,message):
        msgType = message.type
        if msgType == Gst.MessageType.ERROR:
            self.kill()
            print "\n Unable to play Video. Error: ", \
            message.parse_error()
        elif msgType == Gst.MessageType.EOS:
            self.kill()


player =  gst_broadcaster()

class GetHandler(BaseHTTPRequestHandler):


    def do_GET(self):
        self.send_response(200)
        self.send_header("Content-type", "multipart/x-mixed-replace")
        self.end_headers()
        print self.client_address
        try:
            print 'Adding handle'
            player.handle(self.wfile)
        except:
            print 'Exception'
            self.wfile = StringIO()



class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
    """Handle requests in a separate thread."""

class ForkingHTTPServer(ForkingMixIn,HTTPServer):
    """Handle requests in a separate process."""

def http_server():
    server = ThreadedHTTPServer(('0.0.0.0', 8080), GetHandler)
    print 'Starting http server\n'
    server.serve_forever()

http_server()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/gstreamer-devel/attachments/20160216/b0b38473/attachment.html>


More information about the gstreamer-devel mailing list