Best way to update a stream without interrupting the bus?

Liz Maclean liz at harbor.co
Mon Feb 27 15:34:54 UTC 2023


Hi there! I’m working on streaming video and audio from a camera (currently just my Mac), and will eventually need to support changing settings on the stream on the fly (for example, when the camera receives an HTTP request to switch to greyscale). 

- The expected behavior would be that I have a bus interrupt that is triggered when the bus updates, and that interrupt then updates the state machine of the main loop, without stopping the stream.  
- The current behavior is that the bus drives the main loop in 100MS intervals.  When a sleep is called in that loop, the stream is stopped during that sleep period.

What is the best approach for the above expected behavior? I looked through the documentation, but the closest I could find was the main_loop functionality, and that still seemed to be driven by the bus.


#include <gst/gst.h>
#include<unistd.h>

#include <stdio.h>

/* Structure to contain all our information, so we can pass it to callbacks */
typedef struct _CustomData {
  GstElement *pipeline;
  GstElement *source;
  GstElement *video;
  GstElement *sink;
  GstElement *video_save_rate;
} CustomData;

int _main(int argc, char *argv[]) {
  CustomData data;
  GstBus *bus;
  GstMessage *msg;
  GstStateChangeReturn ret;
  gboolean terminate = FALSE;
  gboolean forceAspect = TRUE;
  guint count = 0;

  /* Initialize GStreamer */
  gst_init(&argc, &argv);

  /* Create the elements */
  data.source = gst_element_factory_make("autovideosrc", "source");
  data.sink = gst_element_factory_make("osxvideosink", "sink");
  data.video_save_rate =
      gst_element_factory_make("videorate", "video_save_rate");

  /* Create the empty pipeline */
  data.pipeline = gst_pipeline_new("test-pipeline");

  if (!data.pipeline || !data.source || !data.sink || !data.video_save_rate) {
    g_printerr("Not all elements could be created.\n");
    return -1;
  }

  /* Build the pipeline */
  gst_bin_add_many(GST_BIN(data.pipeline), data.source, data.video_save_rate,
                   data.sink, NULL);
  if (gst_element_link(data.source, data.sink) != TRUE) {
    g_printerr("Elements could not be linked.\n");
    gst_object_unref(data.pipeline);
    return -1;
  }

  /* Modify the source's properties */
  g_object_set(data.sink, "force-aspect-ratio", forceAspect, NULL);

  /* Start playing */
  ret = gst_element_set_state(data.pipeline, GST_STATE_PLAYING);
  if (ret == GST_STATE_CHANGE_FAILURE) {
    g_printerr("Unable to set the pipeline to the playing state.\n");
    gst_object_unref(data.pipeline);
    return -1;
  }

  /* Listen to the bus */
  bus = gst_element_get_bus(data.pipeline);
  do {
    printf("%s\n", ".");
    msg = gst_bus_timed_pop_filtered(
        bus, 100 * GST_MSECOND,
        (GstMessageType)(GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_ERROR |
                         GST_MESSAGE_EOS));
    printf("sleeping\n");
    sleep(1);

    if (count % 50 == 0) {
      forceAspect = !forceAspect;
      printf("5 seconds have passed. Switching aspect ratio to %d\n",
             forceAspect);
      g_object_set(data.sink, "force-aspect-ratio", forceAspect, NULL);
    }

    /* Parse message */
    if (msg != NULL) {
      GError *err;
      gchar *debug_info;

      switch (GST_MESSAGE_TYPE(msg)) {
      case GST_MESSAGE_ERROR:
        gst_message_parse_error(msg, &err, &debug_info);
        g_printerr("Error received from element %s: %s\n",
                   GST_OBJECT_NAME(msg->src), err->message);
        g_printerr("Debugging information: %s\n",
                   debug_info ? debug_info : "none");
        g_clear_error(&err);
        g_free(debug_info);
        terminate = TRUE;
        break;
      case GST_MESSAGE_EOS:
        g_print("End-Of-Stream reached.\n");
        terminate = TRUE;
        break;
      case GST_MESSAGE_STATE_CHANGED:
        /* We are only interested in state-changed messages from the pipeline */
        if (GST_MESSAGE_SRC(msg) == GST_OBJECT(data.pipeline)) {
          GstState old_state, new_state, pending_state;
          gst_message_parse_state_changed(msg, &old_state, &new_state,
                                          &pending_state);
          g_print("Pipeline state changed from %s to %s:\n",
                  gst_element_state_get_name(old_state),
                  gst_element_state_get_name(new_state));
        }
        break;
      default:
        /* We should not reach here */
        g_printerr("Unexpected message received.\n");
        break;
      }
      gst_message_unref(msg);
    }
    count++;
  } while (!terminate);

  /* Free resources */
  gst_object_unref(bus);
  gst_element_set_state(data.pipeline, GST_STATE_NULL);
  gst_object_unref(data.pipeline);
  return 0;
}

int main(int argc, char *argv[]) {
#if defined(__APPLE__) && TARGET_OS_MAC && !TARGET_OS_IPHONE
  return gst_macos_main((GstMainFunc)_main, argc, argv, NULL);
#else
  return _main(argc, argv);
#endif
}


More information about the gstreamer-devel mailing list