[gst-devel] appsink -> appsrc asynchronous help needed please.
Rohan
rohan at perzonae.com
Thu Nov 26 09:09:29 CET 2009
Hi all,
I have been wrestling with this problem, and should have asked earlier
about how to make it work, but it always seemed like the solution was
just around the corner.
I have a python program that uses appsink and appsrc, to basically
write buffers to an array and pop them off and display them. This is
a prototype to get the pipeline working, and it does.
The goal is to have the buffers popped onto an array with an appsink, and
asynchronously popped off, stuff done to them, like sending them over
the network on our protocol etc. and then popped onto the appsrc which
then sends them to the display. Actually the tests are being done
with sound, but the needed functionality is the same.
If I do this sychronously it works, such as below:
-----------------------------------------------------------
#!/usr/bin/env python
import pygst
pygst.require("0.10")
import gst
# import gobject
class client(object):
def __init__(self):
self.buffers = []
pipeline = gst.parse_launch('''audiotestsrc name=testsrc !
decodebin ! audio/x-raw-int !
audioconvert ! vorbisenc name=vorbisenc ! appsink name=sink
sync=False''')
sink = pipeline.get_by_name('sink')
pipeline2 = gst.parse_launch(''' appsrc name=source !
vorbisdec ! audioconvert ! alsasink''')
source = pipeline2.get_by_name("source")
pipeline.set_state(gst.STATE_PLAYING)
pipeline2.set_state(gst.STATE_PLAYING)
while True:
try:
# print "PULL BUFFER"
buf = sink.emit('pull-buffer')
self.buffers.append(buf)
if self.buffers:
source.emit('push-buffer', self.buffers.pop(0))
except SystemError, e:
# it's probably a bug that emits triggers a SystemError
print 'SystemError', e
break
if __name__ == "__main__":
client()
----------------------------------------------------------------
This works a treat, where it is just sticking everything that must
happen into a while loop. You can imagine the various problems this
has, qua performance etc. but a great proof of concept.
The next step is to make this asynchronous, and this is where my woes
really began. I believe I have it narrowed down to a problem with the
caps information being passed, but I am lost as to what I am doing
wrong.
What happens at the moment is that the buffers are being packed on the
array, and one of either the 'need-data' or 'push-buffer' signals is
getting triggered successfully. At this point I am getting this
error.
(sound_asynchronous_appsink.py:1653): GStreamer-CRITICAL **:
gst_pad_set_caps: assertion `caps == NULL || gst_caps_is_fixed (caps)'
failed
and running it with --gst-debug-level=3 I have found this:
0:00:00.049407818 1684 0x8347348 INFO GST_ELEMENT_PADS
gstelement.c:970:gst_element_get_static_pad: no such pad 'src%d' in
element "decode"
0:00:00.281942486 1684 0x8347348 INFO GST_ELEMENT_PADS
gstutils.c:1208:gst_element_get_compatible_pad:<decode> Could not find a
compatible pad to link to capsfilter0:sink
0:00:00.281989280 1684 0x8347348 INFO default
gstutils.c:1872:gst_element_link_pads_filtered: Could not link elements
0:00:00.282014492 1684 0x8347348 INFO GST_STATES
gstelement.c:2238:gst_element_continue_state:<capsfilter0> completed
state change to NULL
and later,
0:00:00.282219407 1684 0x8347348 INFO GST_PIPELINE
./grammar.y:511:gst_parse_perform_link: linking audioconvert:(any) to
vorbisenc:(any) (0/0) with caps "(NULL)"
although later it seems happy with this.
And the place that had the error above has this now:
0:00:00.515774020 1684 0x84ecac0 INFO GST_STATES
gstelement.c:2251:gst_element_continue_state:<pipeline0> posting
state-changed PAUSED to PLAYING
(sound_asynchronous_appsink.py:1684): GStreamer-CRITICAL **:
gst_pad_set_caps: assertion `caps == NULL || gst_caps_is_fixed (caps)'
failed
0:00:00.531633509 1684 0xb6700468 INFO basesrc
gstbasesrc.c:2326:gst_base_src_loop:<source> pausing after
gst_pad_push() = not-negotiated
0:00:00.531698531 1684 0xb6700468 WARN basesrc
gstbasesrc.c:2378:gst_base_src_loop:<source> error: Internal data flow
error.
0:00:00.531718645 1684 0xb6700468 WARN basesrc
gstbasesrc.c:2378:gst_base_src_loop:<source> error: streaming task
paused, reason not-negotiated (-4)
0:00:00.531765090 1684 0xb6700468 INFO GST_ERROR_SYSTEM
gstelement.c:1763:gst_element_message_full:<source> posting message:
Internal data flow error.
0:00:00.531807972 1684 0xb6700468 INFO GST_ERROR_SYSTEM
gstelement.c:1786:gst_element_message_full:<source> posted error
message: Internal data flow error.
0:00:00.531866499 1684 0xb6700468 INFO GST_STATES
gstbin.c:2815:bin_handle_async_done:<pipeline1> committing state from
READY to PAUSED, old pending PLAYING
0:00:00.531892131 1684 0xb6700468 INFO GST_STATES
gstbin.c:2844:bin_handle_async_done:<pipeline1> continue state change,
pending PLAYING
0:00:00.531952055 1684 0x84ecac0 INFO GST_STATES
gstbin.c:2637:gst_bin_continue_func:<pipeline1> continue state change
PAUSED to PLAYING, final PLAYING
0:00:00.532047388 1684 0x84ecac0 WARN bin
gstbin.c:2312:gst_bin_do_latency_func:<pipeline1> failed to query latency
The code that got these results I will put further down, but what I am
wondering is why does this work as a synchronous chain above, but not
asynchronously. I have a small loop that prints out all the caps in
the first pipeline, and I have tried setting the caps to each of these
for the appsrc, but still no luck, so whether with something like
"ANY" as the caps. If I don't set the caps, which is closer to the
first example it happily plays but there is no sound output.
I have read up caps but am still not seeing what I am missing, and
think this has something to do with my not fully understanding how the
caps are being used.
Thanks for any explanation of how caps should be passed around, and
more specifically what I am doing wrong here, would be most
appreciated. I have googled and tried all sorts of examples, but it
is still not sinking in.
Rohan
Here is the other script:
-----------------------------------------------------------------------------
#!/usr/bin/env python
import pygst
pygst.require("0.10")
import gst
import gobject
class client(object):
def __init__(self):
self.buffers = []
pipeline = gst.parse_launch('''audiotestsrc name=testsrc !
decodebin name=decode ! audio/x-raw-int !
audioconvert name=audioconvert !
vorbisenc name=vorbisenc ! appsink name=sink sync=False''')
sink = pipeline.get_by_name('sink')
sink.set_property('emit-signals', True)
sink.connect('new-preroll', self.sink_new_preroll)
sink.connect('new-buffer', self.sink_new_buffer)
sink.connect('eos', self.eos)
#### DEBUGGING CAPS OUTPUT
caps = {}
for element in ['testsrc', 'decode', 'audioconvert',
'vorbisenc', 'sink']:
print
print
e = pipeline.get_by_name(element)
if e:
for p in ['sink', 'src']:
pad = e.get_pad(p)
key = "%s%s" % (element,p)
mess_name = "%s %s " % (element.upper(), p.upper())
if pad and hasattr(pad, 'get_caps'):
caps[key] = pad.get_caps()
print "%s CAPS: " % mess_name, caps[key]
else:
caps["%s%s" % (element, p)] = None
print "%s HAS NO CAPS" % mess_name
print
print
print
self.caps = caps['vorbisencsink']
print
print "self.caps: ",self.caps
print
pipeline2 = gst.parse_launch(''' appsrc name=source !
vorbisdec ! audioconvert ! alsasink''')
source = pipeline2.get_by_name("source")
self.caps and source.set_property('caps', gst.Caps(self.caps))
#source.set_property('emit-signals', True)
source.connect('need-data', self.source_pull_buffer)
source.connect('push-buffer', self.source_pull_buffer)
self.source = source
pipeline.set_state(gst.STATE_PLAYING)
pipeline2.set_state(gst.STATE_PLAYING)
def sink_new_preroll(self, sink):
#print "Prerolling sink"
buf = sink.emit('pull-preroll')
self.buffers.append(buf)
#print "PREROLL BUFFERS: ", len(self.buffers)
self.source.emit('push-buffer', buf)
return True
def sink_new_buffer(self, sink):
#print "new buffer from sink"
buf = sink.emit('pull-buffer')
self.buffers.append(buf)
#print "NEW BUFFER BUFFERS: ", len(self.buffers)
self.source.emit('push-buffer', buf)
return True
def eos(self, sink):
return True
def source_pull_buffer(self, source, length):
if self.buffers:
buf = self.buffers.pop(0)
source.emit('push-buffer', buf)
return True
def pull_next_buffer(self):
self.source.emit('push-buffer', self.buffers.pop(0))
return True
if __name__ == "__main__":
client()
loop = gobject.MainLoop()
loop.run()
----------------------------------------------------------------------
More information about the gstreamer-devel
mailing list