[gst-devel] appsink -> appsrc asynchronous help needed please.

Rohan rohan at perzonae.com
Thu Nov 26 09:09:29 CET 2009


Hi all,
I have been wrestling with this problem, and should have asked earlier
about how to make it work, but it always seemed like the solution was
just around the corner.

I have a python program that uses appsink and appsrc, to basically
write buffers to an array and pop them off and display them.  This is
a prototype to get the pipeline working, and it does.

The goal is to have the buffers popped onto an array with an appsink, and
asynchronously popped off, stuff done to them, like sending them over
the network on our protocol etc. and then popped onto the appsrc which
then sends them to the display.  Actually the tests are being done
with sound, but the needed functionality is the same.

If I do this sychronously it works, such as below:

-----------------------------------------------------------

#!/usr/bin/env python

import pygst
pygst.require("0.10")
import gst

# import gobject


class client(object):
     def __init__(self):
         self.buffers = []
         pipeline = gst.parse_launch('''audiotestsrc name=testsrc !
decodebin ! audio/x-raw-int !
         audioconvert ! vorbisenc name=vorbisenc ! appsink name=sink
sync=False''')
         sink = pipeline.get_by_name('sink')

         pipeline2 = gst.parse_launch(''' appsrc name=source !
          vorbisdec ! audioconvert ! alsasink''')
         source = pipeline2.get_by_name("source")

         pipeline.set_state(gst.STATE_PLAYING)
         pipeline2.set_state(gst.STATE_PLAYING)

         while True:
             try:
                 # print "PULL BUFFER"
                 buf = sink.emit('pull-buffer')
                 self.buffers.append(buf)

                 if self.buffers:
                     source.emit('push-buffer', self.buffers.pop(0))
             except SystemError, e:
                 # it's probably a bug that emits triggers a SystemError
                 print 'SystemError', e
                 break

if __name__ == "__main__":
     client()

----------------------------------------------------------------

This works a treat, where it is just sticking everything that must
happen into a while loop.  You can imagine the various problems this
has, qua performance etc. but a great proof of concept.

The next step is to make this asynchronous, and this is where my woes
really began.  I believe I have it narrowed down to a problem with the
caps information being passed, but I am lost as to what I am doing
wrong.

What happens at the moment is that the buffers are being packed on the
array, and one of either the 'need-data' or 'push-buffer' signals is
getting triggered successfully.  At this point I am getting this
error.

(sound_asynchronous_appsink.py:1653): GStreamer-CRITICAL **:
gst_pad_set_caps: assertion `caps == NULL || gst_caps_is_fixed (caps)'
failed

and running it with --gst-debug-level=3 I have found this:

0:00:00.049407818  1684  0x8347348 INFO        GST_ELEMENT_PADS
gstelement.c:970:gst_element_get_static_pad: no such pad 'src%d' in
element "decode"
0:00:00.281942486  1684  0x8347348 INFO        GST_ELEMENT_PADS
gstutils.c:1208:gst_element_get_compatible_pad:<decode> Could not find a
compatible pad to link to capsfilter0:sink
0:00:00.281989280  1684  0x8347348 INFO                 default
gstutils.c:1872:gst_element_link_pads_filtered: Could not link elements
0:00:00.282014492  1684  0x8347348 INFO              GST_STATES
gstelement.c:2238:gst_element_continue_state:<capsfilter0> completed
state change to NULL

and later,

0:00:00.282219407  1684  0x8347348 INFO            GST_PIPELINE
./grammar.y:511:gst_parse_perform_link: linking audioconvert:(any) to
vorbisenc:(any) (0/0) with caps "(NULL)"

although later it seems happy with this.

And the place that had the error above has this now:

0:00:00.515774020  1684  0x84ecac0 INFO              GST_STATES
gstelement.c:2251:gst_element_continue_state:<pipeline0> posting
state-changed PAUSED to PLAYING

(sound_asynchronous_appsink.py:1684): GStreamer-CRITICAL **:
gst_pad_set_caps: assertion `caps == NULL || gst_caps_is_fixed (caps)'
failed
0:00:00.531633509  1684 0xb6700468 INFO                 basesrc
gstbasesrc.c:2326:gst_base_src_loop:<source> pausing after
gst_pad_push() = not-negotiated
0:00:00.531698531  1684 0xb6700468 WARN                 basesrc
gstbasesrc.c:2378:gst_base_src_loop:<source> error: Internal data flow
error.
0:00:00.531718645  1684 0xb6700468 WARN                 basesrc
gstbasesrc.c:2378:gst_base_src_loop:<source> error: streaming task
paused, reason not-negotiated (-4)
0:00:00.531765090  1684 0xb6700468 INFO        GST_ERROR_SYSTEM
gstelement.c:1763:gst_element_message_full:<source> posting message:
Internal data flow error.
0:00:00.531807972  1684 0xb6700468 INFO        GST_ERROR_SYSTEM
gstelement.c:1786:gst_element_message_full:<source> posted error
message: Internal data flow error.
0:00:00.531866499  1684 0xb6700468 INFO              GST_STATES
gstbin.c:2815:bin_handle_async_done:<pipeline1> committing state from
READY to PAUSED, old pending PLAYING
0:00:00.531892131  1684 0xb6700468 INFO              GST_STATES
gstbin.c:2844:bin_handle_async_done:<pipeline1> continue state change,
pending PLAYING
0:00:00.531952055  1684  0x84ecac0 INFO              GST_STATES
gstbin.c:2637:gst_bin_continue_func:<pipeline1> continue state change
PAUSED to PLAYING, final PLAYING
0:00:00.532047388  1684  0x84ecac0 WARN                     bin
gstbin.c:2312:gst_bin_do_latency_func:<pipeline1> failed to query latency


The code that got these results I will put further down, but what I am
wondering is why does this work as a synchronous chain above, but not
asynchronously.  I have a small loop that prints out all the caps in
the first pipeline, and I have tried setting the caps to each of these
for the appsrc, but still no luck, so whether with something like
"ANY" as the caps.  If I don't set the caps, which is closer to the
first example it happily plays but there is no sound output.

I have read up caps but am still not seeing what I am missing, and
think this has something to do with my not fully understanding how the
caps are being used.

Thanks for any explanation of how caps should be passed around, and
more specifically what I am doing wrong here, would be most
appreciated.  I have googled and tried all sorts of examples, but it
is still not sinking in.

Rohan

Here is the other script:

-----------------------------------------------------------------------------
#!/usr/bin/env python

import pygst
pygst.require("0.10")
import gst

import gobject


class client(object):
     def __init__(self):
         self.buffers = []
         pipeline = gst.parse_launch('''audiotestsrc name=testsrc !
         decodebin name=decode ! audio/x-raw-int !
         audioconvert name=audioconvert !
         vorbisenc name=vorbisenc ! appsink name=sink sync=False''')
         sink = pipeline.get_by_name('sink')
         sink.set_property('emit-signals', True)
         sink.connect('new-preroll', self.sink_new_preroll)
         sink.connect('new-buffer', self.sink_new_buffer)
         sink.connect('eos', self.eos)

         #### DEBUGGING CAPS OUTPUT
         caps = {}
         for element in ['testsrc', 'decode', 'audioconvert',
'vorbisenc', 'sink']:
             print
             print
             e = pipeline.get_by_name(element)
             if e:
                 for p in ['sink', 'src']:
                     pad = e.get_pad(p)
                     key = "%s%s" % (element,p)
                     mess_name = "%s %s " % (element.upper(), p.upper())
                     if pad and hasattr(pad, 'get_caps'):
                         caps[key] = pad.get_caps()
                         print "%s CAPS: " % mess_name, caps[key]
                     else:
                         caps["%s%s" % (element, p)] = None
                         print "%s HAS NO CAPS" % mess_name
                     print
                     print
             print


         self.caps = caps['vorbisencsink']
         print
         print "self.caps: ",self.caps
         print

         pipeline2 = gst.parse_launch(''' appsrc name=source !
          vorbisdec ! audioconvert ! alsasink''')
         source = pipeline2.get_by_name("source")
         self.caps and source.set_property('caps', gst.Caps(self.caps))
         #source.set_property('emit-signals', True)
         source.connect('need-data', self.source_pull_buffer)
         source.connect('push-buffer', self.source_pull_buffer)
         self.source = source

         pipeline.set_state(gst.STATE_PLAYING)
         pipeline2.set_state(gst.STATE_PLAYING)

     def sink_new_preroll(self, sink):
         #print "Prerolling sink"
         buf = sink.emit('pull-preroll')
         self.buffers.append(buf)
         #print "PREROLL BUFFERS: ", len(self.buffers)
         self.source.emit('push-buffer', buf)
         return True

     def sink_new_buffer(self, sink):
         #print "new buffer from sink"
         buf = sink.emit('pull-buffer')
         self.buffers.append(buf)
         #print "NEW BUFFER BUFFERS: ", len(self.buffers)
         self.source.emit('push-buffer', buf)
         return True

     def eos(self, sink):
         return True

     def source_pull_buffer(self, source, length):
         if self.buffers:
             buf = self.buffers.pop(0)
             source.emit('push-buffer', buf)
         return True

     def pull_next_buffer(self):
         self.source.emit('push-buffer', self.buffers.pop(0))
         return True

if __name__ == "__main__":
     client()
     loop = gobject.MainLoop()
     loop.run()

----------------------------------------------------------------------





More information about the gstreamer-devel mailing list