<html style="direction: ltr;">
  <head>
    <meta content="text/html; charset=ISO-8859-1"
      http-equiv="Content-Type">
    <style>body p { margin-bottom: 0cm; margin-top: 0pt; } </style>
    <style>body p { margin-bottom: 0cm; margin-top: 0pt; } </style>
  </head>
  <body style="direction: ltr;"
    bidimailui-detected-decoding-type="latin-charset" bgcolor="#FFFFFF"
    text="#000000">
    <p> OK! I've solved most of my issues, and it was not easy. I'm
      posting this so that others can learn from my (mis)adventures.</p>
    <br>
    My original approach was essentially good. If you want to be able to
    add/remove sinks dynamically to a playing pipeline, you need
    something like this:<br>
    <br>
    playbin2 ! tee ! valve ! queue2 ! ... ! sink<br>
    <br>
    The whole "valve ! queue2 ! ... ! sink" is considered a branch, and
    you need a whole branch per sink. Note that all the branch elements
    and the tee are all in one "bin", and that I use a ghost pad to
    connect that bin to the playbin2 element.<br>
    <br>
    Why the "queue2" elements? We need this because we use a "tee", but
    need every branch to have its own clock.<br>
    <br>
    Why the "valve"? You'll see later when we need to remove the branch.<br>
    <br>
    First off, before you add any real branch, you want to add one fake
    one: "queue2 ! fakesink sync=true". Note that we didn't use a valve
    here, because we're never going to remove this branch. Also note
    "sync=true", otherwise the fakesink will swallow the whole stream in
    less than second. This branch guarantees that the pipeline will keep
    going at the right pace even if no-one is connected.<br>
    <br>
    To add a branch, you need to make sure you're in the glib main loop
    thread. I did this in Python using gst.idle_add(callback). Make sure
    to return False from your callback! I found out the hard way that if
    you don't, your callback will *repeatedly execute*.<br>
    <br>
    First, build all your branch elements and them to the bin. Then,
    link them together after the tee, and sync the state in order. From
    my code:<br>
    <br>
    gst.element_link_many(self.tee, valve, queue, sink)<br>
    valve.sync_state_with_parent()<br>
    queue.sync_state_with_parent()<br>
    sink.sync_state_with_parent()<br>
    <br>
    That's it! Your branch should be playing now.<br>
    <br>
    Removing a branch is much trickier. Again, make sure you are in the
    glib main loop thread. Then, turn off the valve, and turn :<br>
    <br>
    valve.set_property('drop', True)<br>
    <br>
    Now comes the tricky part. Before removing our branch elements from
    the bin, they need to be in state NULL. However, setting state to
    NULL can often be asynchronous, so you need a way to wait until the
    state is actually NULL. Here's how: we create a special array called
    "element_to_remove" which will keep track of them. Then, we ask the
    elements to stop like so:<br>
    <br>
    self.elements_to_remove.append(valve)<br>
    valve.set_state(gst.STATE_NULL)<br>
    self.elements_to_remove.append(queue)<br>
    queue.set_state(gst.STATE_NULL)<br>
    self.elements_to_remove.append(sink)<br>
    sink.set_state(gst.STATE_NULL)<br>
    <br>
    How do we know when they stop? Well, we set up the usual bus
    listener in the beggining:<br>
    <br>
    bus = self.player.get_bus()<br>
    bus.add_signal_watch()<br>
    bus.connect('message', self._on_message)<br>
    <br>
    Then, in&nbsp; _on_message we can look for the state change and safely
    remove our elements:<br>
    <br>
    if message.type == gst.MESSAGE_STATE_CHANGED:<br>
    &nbsp;&nbsp;&nbsp; oldstate, newstate, pending = message.parse_state_changed()<br>
    &nbsp;&nbsp;&nbsp; element = message.src<br>
    &nbsp;&nbsp;&nbsp; if newstate == gst.STATE_NULL:<br>
    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; remove = False<br>
    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try:<br>
    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.elements_to_remove.remove(element)<br>
    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; remove = True<br>
    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; except ValueError:<br>
    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; pass<br>
    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if remove:<br>
    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; element.set_locked_state(True)<br>
    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; self.bin.remove(element)<br>
    &nbsp;<br>
    Note that we've also locked the element's state. That's not entirely
    necessary, but it's just a way to guarantee it doesn't change for
    some reason.<br>
    <br>
    That's it! You can add/remove branches while the pipeline is
    playing.<br>
    <br>
    The other part of my problem with adding fault-tolerance to the
    networking sinks. I could not manage this with PulseAudio, and would
    still love to hear suggestions on how. I'm imagining some kind of
    valve-like element to sit before the sink, and simulate a fakesink
    whenever the real sink is not behaving well. What I did instead was
    abandon PulseAudio as the transport, and use RTP on the server and
    on the clients. The client can then use its local PulseAudio sink.
    So, it's PulseAudio, just not as the transport protocol. RTP was
    quite difficult to set up, and is an adventure story for another
    day...<br>
    <br>
    -Tal<br>
    <br>
  </body>
</html>