[gst-devel] "threadsafe" signals

Benjamin Otte in7y118 at public.uni-hamburg.de
Fri Jan 9 10:55:01 CET 2004


Attached is a file that implements a general way to do threadsafe signal
marshalling into main contexts. It works on every signal, regardless of
prototype.
This is done the following way:
- When the handler should be emitted, the closure instead adds an idle
handler with highest priority to the (a) main context.
- After that the thread blocks until the main context has handled the
signal.
- Then it continues.

This has 2 problems:
1) It blocks execution of the thread until the main context feels free
to handle the signal. (Note: only when there's actually a handler
connected, not by default)
2) If the signal isn't emitted in a thread but from the main context, you
get a deadlock.

Do we want this in gstreamer core?

Benjamin

PS: Thanks to Mike for pointing out that using a custom closure is a
cool way to handle this.
PPS: This time with attachment. To compile use
gcc `pkg-config --cflags --libs gtk+-2.0 gthread-2.0` signal.c
-------------- next part --------------
#include <gtk/gtk.h>

//#define USE_DEBUG

#ifdef USE_DEBUG
#  define DEBUG g_print
#else
#  define DEBUG(...) /* NOP */
#endif

typedef struct {
  GClosure		closure;
  GCallback		callback; /* imitate GCClosure here */
  GClosureNotify	user_destroy_data;
  GMutex *		mutex;
  GCond *		cond;
  GMainContext *	context;
} GstSyncClosure;

typedef struct {
  GClosure *		closure;
  GValue *		return_value;
  guint			n_param_values;
  const GValue *	param_value;
  gpointer		invocation_hint;
} GstMarshalCache;

static void
gst_sync_closure_destroy (gpointer unused, GClosure *callback)
{
  GstSyncClosure *sync;

  g_cond_free (sync->cond);
  g_mutex_free (sync->mutex);
}

static gboolean
gst_sync_real_marshal (gpointer data)
{
  GstMarshalCache *cache = (GstMarshalCache *) data;
  GstSyncClosure *sync = (GstSyncClosure *) cache->closure;

  g_mutex_lock (sync->mutex);
  cache->closure->marshal (cache->closure, cache->return_value, cache->n_param_values,
      cache->param_value, cache->invocation_hint, NULL);

  g_cond_signal (sync->cond);
  g_mutex_unlock (sync->mutex);

  return FALSE;
}

static void
gst_sync_meta_marshal (GClosure *closure, GValue *return_value, guint n_param_values,
    const GValue *param_values, gpointer invocation_hint, gpointer marshal_data)
{
  GstSyncClosure *sync = (GstSyncClosure *) closure;
  GstMarshalCache cache = { closure, return_value, n_param_values, param_values, invocation_hint };
  GSource *source;
  
  source = g_idle_source_new ();
  g_source_set_callback (source, gst_sync_real_marshal, &cache, NULL);
  g_source_set_priority (source, G_PRIORITY_HIGH);
  g_mutex_lock (sync->mutex);
  g_source_attach (source, sync->context);
  DEBUG ("%p     waiting...\n", g_thread_self ());
  g_cond_wait (sync->cond, sync->mutex);
  DEBUG ("%p     ...done\n", g_thread_self ());
  g_mutex_unlock (sync->mutex);
  /* FIXME: the source is deleted automagically, right? */
}

GClosure *
gst_sync_closure_new (GCallback callback_func, gpointer user_data,
    GClosureNotify destroy_data, GMainContext *context)
{
  GClosure *closure;
  GstSyncClosure *sync;
  
  g_return_val_if_fail (callback_func != NULL, NULL);
  
  closure = g_closure_new_simple (sizeof (GstSyncClosure), user_data);
  if (destroy_data)
    g_closure_add_finalize_notifier (closure, user_data, destroy_data);
  g_closure_add_finalize_notifier (closure, NULL, gst_sync_closure_destroy);
  g_closure_set_meta_marshal (closure, NULL, gst_sync_meta_marshal);
  sync = (GstSyncClosure *) closure;
  sync->callback = callback_func;
  sync->mutex = g_mutex_new ();
  sync->cond = g_cond_new ();
  sync->context = context ? context : g_main_context_default ();
  
  return closure;
}

#define gst_sync_signal_connect(instance, detailed_signal, c_handler, data) \
    gst_sync_signal_connect_data ((instance), (detailed_signal), (c_handler), (data), NULL, (GConnectFlags) 0)
#define gst_sync_signal_connect_after(instance, detailed_signal, c_handler, data) \
      gst_sync_signal_connect_data ((instance), (detailed_signal), (c_handler), (data), NULL, G_CONNECT_AFTER)
#define gst_sync_signal_connect_swapped(instance, detailed_signal, c_handler, data) \
      gst_sync_signal_connect_data ((instance), (detailed_signal), (c_handler), (data), NULL, G_CONNECT_SWAPPED)
  
gulong
gst_sync_signal_connect_data (gpointer instance, const gchar *detailed_signal,
        GCallback c_handler, gpointer data, GClosureNotify destroy_data, GConnectFlags connect_flags)
{
  GClosure *closure;
  gboolean after;
  
  g_return_val_if_fail (detailed_signal != NULL, 0);
  g_return_val_if_fail (c_handler != NULL, 0);
  
  after = (connect_flags & G_CONNECT_AFTER) != FALSE;
  
  closure = g_closure_ref (gst_sync_closure_new (c_handler, data, destroy_data, NULL));
  g_closure_sink (closure);
  closure->derivative_flag = (connect_flags & G_CONNECT_SWAPPED) != FALSE;
  
  return g_signal_connect_closure (instance, detailed_signal, closure, after);
}


/*** TEST THIS ***/


static gpointer
thread_run (gpointer labelp)
{
  GtkLabel *label = GTK_LABEL (labelp);
  gchar *text = g_strdup (gtk_label_get_label (label));

  while (TRUE) {
    g_usleep (g_random_int_range (1, 10 * G_USEC_PER_SEC / 1000));
    DEBUG ("%p   woken up\n", g_thread_self ());
    DEBUG ("%p   set to bla\n", g_thread_self ());
    gtk_label_set_text (label, "bla");
    DEBUG ("%p   set to %s\n", g_thread_self (), text);
    gtk_label_set_text (label, text);
  }

  return NULL;
}

static void
set_label (GtkLabel *from, GParamSpec *pspec, GtkLabel *to)
{
  /* do not GDK_THREADS_ENTER here, just assume we're in the correct thread 
   * Yes, that's not right, but we want to test that, damnit :) */
  gtk_label_set_label (to, gtk_label_get_label (from));
  DEBUG ("%p   setting label to %s\n", g_thread_self (), gtk_label_get_label (from));
}

gint
main (gint argc, gchar** argv)
{
  gint i;
  GtkWidget *change, *window;
  
  g_thread_init (NULL);
  gtk_init (&argc, &argv);
  change = gtk_label_new ("unset");

  /* Create 100 objects with different texts that emit a signal when you change them.
   * Then create a thread for each of them that randomly fires the signal.
   * Connect a sync signal to it that freaks GDK a bit */
  for (i = 0; i < 3; i++) {
    gchar *text = g_strdup_printf ("Thread %d", i);
    GtkWidget *label = gtk_label_new (text);
    gst_sync_signal_connect (label, "notify", (GCallback) set_label, change);
    g_thread_create (thread_run, label, FALSE, NULL);
  }
  
  /* create the window */
  window = gtk_window_new (GTK_WINDOW_TOPLEVEL);
  gtk_container_add (GTK_CONTAINER (window), change);
  gtk_widget_show_all (window);
  g_signal_connect (window, "destroy", gtk_main_quit, NULL);
  
  gtk_main ();
  
  return 0;
}


More information about the gstreamer-devel mailing list