telepathy-logger: log-walker: Use a queue instead of a mutex for serializing operations

Debarshi Ray debarshir at kemper.freedesktop.org
Tue Jan 15 08:18:31 PST 2013


Module: telepathy-logger
Branch: master
Commit: 7d84e7b97603af85c78a3021ae9aabe572f942c9
URL:    http://cgit.freedesktop.org/telepathy/telepathy-logger/commit/?id=7d84e7b97603af85c78a3021ae9aabe572f942c9

Author: Debarshi Ray <debarshir at freedesktop.org>
Date:   Wed Jan  9 14:28:34 2013 +0100

log-walker: Use a queue instead of a mutex for serializing operations

Earlier we were using threads to implement the asynchronous get_events
and rewind methods, so we used a mutex to serialize them. However, now
get_events has been moved to a single threaded model, so using a mutex
will lead to undefined behaviour.

Instead we use a queue to store incoming operations (ie. get_events or
rewind) and they are executed one after the other.

Fixes: https://bugs.freedesktop.org/54270

---

 telepathy-logger/log-walker.c |  107 +++++++++++++++++++++++++++--------------
 1 files changed, 70 insertions(+), 37 deletions(-)

diff --git a/telepathy-logger/log-walker.c b/telepathy-logger/log-walker.c
index dff3ed0..c6bf787 100644
--- a/telepathy-logger/log-walker.c
+++ b/telepathy-logger/log-walker.c
@@ -181,7 +181,7 @@ struct _TplLogWalkerPriv
   GList *caches;
   GList *history;
   GList *iters;
-  GMutex mutex;
+  GQueue *queue;
   TplLogEventFilter filter;
   gboolean is_start;
   gboolean is_end;
@@ -200,6 +200,12 @@ G_DEFINE_TYPE (TplLogWalker, tpl_log_walker, G_TYPE_OBJECT);
 
 static const gsize CACHE_SIZE = 5;
 
+typedef enum
+{
+  TPL_LOG_WALKER_OP_GET_EVENTS,
+  TPL_LOG_WALKER_OP_REWIND
+} TplLogWalkerOpType;
+
 typedef struct
 {
   GAsyncReadyCallback cb;
@@ -209,6 +215,7 @@ typedef struct
   GList *latest_cache;
   GList *latest_event;
   GList *latest_iter;
+  TplLogWalkerOpType op_type;
   gint64 latest_timestamp;
   guint num_events;
 } TplLogWalkerAsyncData;
@@ -220,6 +227,8 @@ typedef struct
   guint count;
 } TplLogWalkerHistoryData;
 
+static void tpl_log_walker_op_run (TplLogWalker *walker);
+
 
 static TplLogWalkerAsyncData *
 tpl_log_walker_async_data_new (void)
@@ -256,15 +265,23 @@ tpl_log_walker_async_operation_cb (GObject *source_object,
     GAsyncResult *result,
     gpointer user_data)
 {
+  TplLogWalker *walker;
+  TplLogWalkerPriv *priv;
   GSimpleAsyncResult *simple;
   TplLogWalkerAsyncData *async_data;
 
+  walker = TPL_LOG_WALKER (source_object);
+  priv = walker->priv;
+
   simple = G_SIMPLE_ASYNC_RESULT (result);
   async_data = (TplLogWalkerAsyncData *)
       g_simple_async_result_get_op_res_gpointer (simple);
 
   if (async_data->cb)
     async_data->cb (source_object, result, user_data);
+
+  g_object_unref (g_queue_pop_head (priv->queue));
+  tpl_log_walker_op_run (walker);
 }
 
 
@@ -479,14 +496,7 @@ tpl_log_walker_get_events (GObject *source_object,
     priv->is_start = FALSE;
 
  out:
-  g_mutex_unlock (&priv->mutex);
-
-  if (result != NULL)
-    g_simple_async_result_complete (simple);
-  else
-    g_simple_async_result_complete_in_idle (simple);
-
-  g_object_unref (simple);
+  g_simple_async_result_complete_in_idle (simple);
 }
 
 
@@ -505,10 +515,8 @@ tpl_log_walker_rewind (TplLogWalker *walker,
   priv = walker->priv;
   i = 0;
 
-  g_mutex_lock (&priv->mutex);
-
   if (priv->is_start == TRUE || num_events == 0)
-    goto out;
+    return;
 
   priv->is_end = FALSE;
 
@@ -548,9 +556,6 @@ tpl_log_walker_rewind (TplLogWalker *walker,
             priv->is_start = TRUE;
         }
     }
-
- out:
-  g_mutex_unlock (&priv->mutex);
 }
 
 
@@ -574,6 +579,36 @@ tpl_log_walker_rewind_async_thread (GSimpleAsyncResult *simple,
 
 
 static void
+tpl_log_walker_op_run (TplLogWalker *walker)
+{
+  TplLogWalkerPriv *priv;
+  GSimpleAsyncResult *simple;
+  TplLogWalkerAsyncData *async_data;
+
+  priv = walker->priv;
+
+  if (g_queue_is_empty (priv->queue))
+    return;
+
+  simple = G_SIMPLE_ASYNC_RESULT (g_queue_peek_head (priv->queue));
+  async_data = (TplLogWalkerAsyncData *)
+      g_simple_async_result_get_op_res_gpointer (simple);
+
+  switch (async_data->op_type)
+    {
+    case TPL_LOG_WALKER_OP_GET_EVENTS:
+      tpl_log_walker_get_events (G_OBJECT (walker), NULL, simple);
+      break;
+
+    case TPL_LOG_WALKER_OP_REWIND:
+      g_simple_async_result_run_in_thread (simple,
+          tpl_log_walker_rewind_async_thread, G_PRIORITY_DEFAULT, NULL);
+      break;
+    }
+}
+
+
+static void
 tpl_log_walker_dispose (GObject *object)
 {
   TplLogWalkerPriv *priv;
@@ -600,7 +635,7 @@ tpl_log_walker_finalize (GObject *object)
   TplLogWalkerPriv *priv;
 
   priv = TPL_LOG_WALKER (object)->priv;
-  g_mutex_clear (&priv->mutex);
+  g_queue_free_full (priv->queue, g_object_unref);
 
   G_OBJECT_CLASS (tpl_log_walker_parent_class)->finalize (object);
 }
@@ -669,8 +704,7 @@ tpl_log_walker_init (TplLogWalker *walker)
       TplLogWalkerPriv);
   priv = walker->priv;
 
-  g_mutex_init (&priv->mutex);
-
+  priv->queue = g_queue_new ();
   priv->is_start = TRUE;
   priv->is_end = FALSE;
 }
@@ -744,14 +778,18 @@ tpl_log_walker_get_events_async (TplLogWalker *walker,
     GAsyncReadyCallback callback,
     gpointer user_data)
 {
+  TplLogWalkerPriv *priv;
   GSimpleAsyncResult *simple;
   TplLogWalkerAsyncData *async_data;
 
   g_return_if_fail (TPL_IS_LOG_WALKER (walker));
 
+  priv = walker->priv;
+
   async_data = tpl_log_walker_async_data_new ();
   async_data->cb = callback;
   async_data->num_events = num_events;
+  async_data->op_type = TPL_LOG_WALKER_OP_GET_EVENTS;
 
   simple = g_simple_async_result_new (G_OBJECT (walker),
       tpl_log_walker_async_operation_cb, user_data,
@@ -760,8 +798,11 @@ tpl_log_walker_get_events_async (TplLogWalker *walker,
   g_simple_async_result_set_op_res_gpointer (simple, async_data,
       (GDestroyNotify) tpl_log_walker_async_data_free);
 
-  g_mutex_lock (&walker->priv->mutex);
-  tpl_log_walker_get_events (G_OBJECT (walker), NULL, simple);
+  g_queue_push_tail (priv->queue, g_object_ref (simple));
+  if (g_queue_get_length (priv->queue) == 1)
+    tpl_log_walker_op_run (walker);
+
+  g_object_unref (simple);
 }
 
 
@@ -822,14 +863,18 @@ tpl_log_walker_rewind_async (TplLogWalker *walker,
     GAsyncReadyCallback callback,
     gpointer user_data)
 {
+  TplLogWalkerPriv *priv;
   GSimpleAsyncResult *simple;
   TplLogWalkerAsyncData *async_data;
 
   g_return_if_fail (TPL_IS_LOG_WALKER (walker));
 
+  priv = walker->priv;
+
   async_data = tpl_log_walker_async_data_new ();
   async_data->cb = callback;
   async_data->num_events = num_events;
+  async_data->op_type = TPL_LOG_WALKER_OP_REWIND;
 
   simple = g_simple_async_result_new (G_OBJECT (walker),
       tpl_log_walker_async_operation_cb, user_data,
@@ -838,9 +883,9 @@ tpl_log_walker_rewind_async (TplLogWalker *walker,
   g_simple_async_result_set_op_res_gpointer (simple, async_data,
       (GDestroyNotify) tpl_log_walker_async_data_free);
 
-  g_simple_async_result_run_in_thread (simple,
-      tpl_log_walker_rewind_async_thread, G_PRIORITY_DEFAULT,
-      NULL);
+  g_queue_push_tail (priv->queue, g_object_ref (simple));
+  if (g_queue_get_length (priv->queue) == 1)
+    tpl_log_walker_op_run (walker);
 
   g_object_unref (simple);
 }
@@ -889,15 +934,9 @@ gboolean
 tpl_log_walker_is_start (TplLogWalker *walker)
 {
   TplLogWalkerPriv *priv;
-  gboolean retval;
 
   priv = walker->priv;
-
-  g_mutex_lock (&priv->mutex);
-  retval = priv->is_start;
-  g_mutex_unlock (&priv->mutex);
-
-  return retval;
+  return priv->is_start;
 }
 
 
@@ -914,13 +953,7 @@ gboolean
 tpl_log_walker_is_end (TplLogWalker *walker)
 {
   TplLogWalkerPriv *priv;
-  gboolean retval;
 
   priv = walker->priv;
-
-  g_mutex_lock (&priv->mutex);
-  retval = priv->is_end;
-  g_mutex_unlock (&priv->mutex);
-
-  return retval;
+  return priv->is_end;
 }



More information about the telepathy-commits mailing list