[gst-devel] "threadsafe" signals
Benjamin Otte
in7y118 at public.uni-hamburg.de
Fri Jan 9 10:55:01 CET 2004
Attached is a file that implements a general way to do threadsafe signal
marshalling into main contexts. It works on every signal, regardless of
prototype.
This is done the following way:
- When the handler should be emitted, the closure instead adds an idle
handler with highest priority to the (a) main context.
- After that the thread blocks until the main context has handled the
signal.
- Then it continues.
This has 2 problems:
1) It blocks execution of the thread until the main context feels free
to handle the signal. (Note: only when there's actually a handler
connected, not by default)
2) If the signal isn't emitted in a thread but from the main context, you
get a deadlock.
Do we want this in gstreamer core?
Benjamin
PS: Thanks to Mike for pointing out that using a custom closure is a
cool way to handle this.
PPS: This time with attachment. To compile use
gcc `pkg-config --cflags --libs gtk+-2.0 gthread-2.0` signal.c
-------------- next part --------------
#include <gtk/gtk.h>
//#define USE_DEBUG
#ifdef USE_DEBUG
# define DEBUG g_print
#else
# define DEBUG(...) /* NOP */
#endif
typedef struct {
GClosure closure;
GCallback callback; /* imitate GCClosure here */
GClosureNotify user_destroy_data;
GMutex * mutex;
GCond * cond;
GMainContext * context;
} GstSyncClosure;
typedef struct {
GClosure * closure;
GValue * return_value;
guint n_param_values;
const GValue * param_value;
gpointer invocation_hint;
} GstMarshalCache;
static void
gst_sync_closure_destroy (gpointer unused, GClosure *callback)
{
GstSyncClosure *sync;
g_cond_free (sync->cond);
g_mutex_free (sync->mutex);
}
static gboolean
gst_sync_real_marshal (gpointer data)
{
GstMarshalCache *cache = (GstMarshalCache *) data;
GstSyncClosure *sync = (GstSyncClosure *) cache->closure;
g_mutex_lock (sync->mutex);
cache->closure->marshal (cache->closure, cache->return_value, cache->n_param_values,
cache->param_value, cache->invocation_hint, NULL);
g_cond_signal (sync->cond);
g_mutex_unlock (sync->mutex);
return FALSE;
}
static void
gst_sync_meta_marshal (GClosure *closure, GValue *return_value, guint n_param_values,
const GValue *param_values, gpointer invocation_hint, gpointer marshal_data)
{
GstSyncClosure *sync = (GstSyncClosure *) closure;
GstMarshalCache cache = { closure, return_value, n_param_values, param_values, invocation_hint };
GSource *source;
source = g_idle_source_new ();
g_source_set_callback (source, gst_sync_real_marshal, &cache, NULL);
g_source_set_priority (source, G_PRIORITY_HIGH);
g_mutex_lock (sync->mutex);
g_source_attach (source, sync->context);
DEBUG ("%p waiting...\n", g_thread_self ());
g_cond_wait (sync->cond, sync->mutex);
DEBUG ("%p ...done\n", g_thread_self ());
g_mutex_unlock (sync->mutex);
/* FIXME: the source is deleted automagically, right? */
}
GClosure *
gst_sync_closure_new (GCallback callback_func, gpointer user_data,
GClosureNotify destroy_data, GMainContext *context)
{
GClosure *closure;
GstSyncClosure *sync;
g_return_val_if_fail (callback_func != NULL, NULL);
closure = g_closure_new_simple (sizeof (GstSyncClosure), user_data);
if (destroy_data)
g_closure_add_finalize_notifier (closure, user_data, destroy_data);
g_closure_add_finalize_notifier (closure, NULL, gst_sync_closure_destroy);
g_closure_set_meta_marshal (closure, NULL, gst_sync_meta_marshal);
sync = (GstSyncClosure *) closure;
sync->callback = callback_func;
sync->mutex = g_mutex_new ();
sync->cond = g_cond_new ();
sync->context = context ? context : g_main_context_default ();
return closure;
}
#define gst_sync_signal_connect(instance, detailed_signal, c_handler, data) \
gst_sync_signal_connect_data ((instance), (detailed_signal), (c_handler), (data), NULL, (GConnectFlags) 0)
#define gst_sync_signal_connect_after(instance, detailed_signal, c_handler, data) \
gst_sync_signal_connect_data ((instance), (detailed_signal), (c_handler), (data), NULL, G_CONNECT_AFTER)
#define gst_sync_signal_connect_swapped(instance, detailed_signal, c_handler, data) \
gst_sync_signal_connect_data ((instance), (detailed_signal), (c_handler), (data), NULL, G_CONNECT_SWAPPED)
gulong
gst_sync_signal_connect_data (gpointer instance, const gchar *detailed_signal,
GCallback c_handler, gpointer data, GClosureNotify destroy_data, GConnectFlags connect_flags)
{
GClosure *closure;
gboolean after;
g_return_val_if_fail (detailed_signal != NULL, 0);
g_return_val_if_fail (c_handler != NULL, 0);
after = (connect_flags & G_CONNECT_AFTER) != FALSE;
closure = g_closure_ref (gst_sync_closure_new (c_handler, data, destroy_data, NULL));
g_closure_sink (closure);
closure->derivative_flag = (connect_flags & G_CONNECT_SWAPPED) != FALSE;
return g_signal_connect_closure (instance, detailed_signal, closure, after);
}
/*** TEST THIS ***/
static gpointer
thread_run (gpointer labelp)
{
GtkLabel *label = GTK_LABEL (labelp);
gchar *text = g_strdup (gtk_label_get_label (label));
while (TRUE) {
g_usleep (g_random_int_range (1, 10 * G_USEC_PER_SEC / 1000));
DEBUG ("%p woken up\n", g_thread_self ());
DEBUG ("%p set to bla\n", g_thread_self ());
gtk_label_set_text (label, "bla");
DEBUG ("%p set to %s\n", g_thread_self (), text);
gtk_label_set_text (label, text);
}
return NULL;
}
static void
set_label (GtkLabel *from, GParamSpec *pspec, GtkLabel *to)
{
/* do not GDK_THREADS_ENTER here, just assume we're in the correct thread
* Yes, that's not right, but we want to test that, damnit :) */
gtk_label_set_label (to, gtk_label_get_label (from));
DEBUG ("%p setting label to %s\n", g_thread_self (), gtk_label_get_label (from));
}
gint
main (gint argc, gchar** argv)
{
gint i;
GtkWidget *change, *window;
g_thread_init (NULL);
gtk_init (&argc, &argv);
change = gtk_label_new ("unset");
/* Create 100 objects with different texts that emit a signal when you change them.
* Then create a thread for each of them that randomly fires the signal.
* Connect a sync signal to it that freaks GDK a bit */
for (i = 0; i < 3; i++) {
gchar *text = g_strdup_printf ("Thread %d", i);
GtkWidget *label = gtk_label_new (text);
gst_sync_signal_connect (label, "notify", (GCallback) set_label, change);
g_thread_create (thread_run, label, FALSE, NULL);
}
/* create the window */
window = gtk_window_new (GTK_WINDOW_TOPLEVEL);
gtk_container_add (GTK_CONTAINER (window), change);
gtk_widget_show_all (window);
g_signal_connect (window, "destroy", gtk_main_quit, NULL);
gtk_main ();
return 0;
}
More information about the gstreamer-devel
mailing list