import gst import logging from collections import deque codec = 'vorbis' class Player: def __init__(self): self._logger = logging.getLogger("rawspeex.Player") self._pipe = gst.Pipeline("play-pipeline") self._source = gst.element_factory_make("appsrc") self._source.props.caps = gst.Caps("application/ogg") self._source.props.max_bytes = 2**21 #buffer up to 2 MB self._source.props.block = False #throw away new audio once the buffer is full #self._source.props.is_live = True self._source.props.stream_type = 0 #standin for GST_APP_STREAM_TYPE_STREAM demux = gst.element_factory_make("oggdemux") #oggdemux doesn't know its output type until the pipeline starts, so #it can't be statically linked to speexdec. Therefore, we introduce #__make_link_cb to complete the connection self._link_handler = demux.connect("pad-added", self.__make_link_cb) self._decoder = gst.element_factory_make(codec+ "dec") self._converter = gst.element_factory_make("audioconvert") self._sink = gst.element_factory_make("autoaudiosink") self._pipe.add(self._source, demux, self._decoder, self._converter, self._sink) gst.element_link_many(self._source, demux) gst.element_link_many(self._decoder, self._converter, self._sink) bus = self._pipe.get_bus() bus.add_signal_watch() bus.connect("message", self.__message_cb) def enqueue(self, oggbytes): print "Pipe's state is %s" % repr(self._pipe.get_state()) flow = self._source.emit('push-buffer',gst.Buffer(oggbytes)) # flow is unchecked, so if the buffer fills up, we won't even notice if flow != gst.FLOW_OK: print("Got unexpected flow status %r when trying to " "push a buffer of %d bytes" % (flow, len(oggbytes))) def play(self): self._pipe.set_state(gst.STATE_PLAYING) self._source.send_event(gst.event_new_eos()) def pause(self): self._pipe.set_state(gst.STATE_PAUSED) def is_playing(self): print self._pipe.get_state() return self._pipe.get_state()[1] == gst.STATE_PLAYING def __make_link_cb(self, demux, src_pad): print "complete link" sink_pad = self._decoder.get_pad("sink") src_pad.link(sink_pad) demux.disconnect(self._link_handler) print "completed link" def __message_cb(self, bus, msg): print msg if msg.type == gst.MESSAGE_EOS: print "Got EOS" self._pipe.set_state(gst.STATE_NULL) self._pipe.set_state(gst.STATE_READY) class Recorder: def __init__(self): self._logger = logging.getLogger("rawspeex.Recorder") self._build_pipeline() # I tried to make this work without rebuilding the pipeline all the time, # but it seems that the audio encoders have not been written to support # that sort of restarting. def _build_pipeline(self): self._pipe = gst.Pipeline("record-pipeline") self._source = gst.element_factory_make("autoaudiosrc") self._converter = gst.element_factory_make("audioconvert") self._encoder = gst.element_factory_make(codec + "enc") self._mux = gst.element_factory_make("oggmux") self._sink = gst.element_factory_make("appsink") self._sink.props.caps = gst.Caps("application/ogg") self._pipe.add(self._source, self._converter, self._encoder, self._mux, self._sink) gst.element_link_many(self._source, self._converter, self._encoder, self._mux, self._sink) def _destroy_pipeline(self): self._source.set_state(gst.STATE_NULL) self._converter.set_state(gst.STATE_NULL) self._encoder.set_state(gst.STATE_NULL) self._mux.set_state(gst.STATE_NULL) self._sink.set_state(gst.STATE_NULL) self._pipe.set_state(gst.STATE_NULL) del self._source del self._converter del self._encoder del self._mux del self._sink del self._pipe def record(self): self._pipe.set_state(gst.STATE_PLAYING) def stop(self): self._source.send_event(gst.event_new_eos()) buffers = [] b = self._get_buffer() while b: buffers.append(str(b)) b = self._get_buffer() self._destroy_pipeline() self._build_pipeline() #Do this here so that record() can be fast for UI reasons return ''.join(buffers) def is_recording(self): return self._pipe.get_state()[1] == gst.STATE_PLAYING def _get_buffer(self): # See https://thomas.apestaart.org/thomas/trac/browser/tests/gst/crc/crc.py # for the source of this concept and an explanation of why it makes sense try: return self._sink.emit('pull-buffer') except SystemError, e: self._logger.warn('pygst bindings are buggy, producing error %s' % e) return None if __name__ == '__main__': p = Player() r = Recorder() import time def next(): if r.is_recording(): s = r.stop() print("Got %d bytes" % len(s)) p.enqueue(s) p.play() else: time.sleep(1) r.record() print("Started recording") return True import gobject gobject.timeout_add(5000,next) import gtk gtk.main()