dbus message queue latency time is not constant for same data payload

deepak jewargi djewargi at gmail.com
Tue May 23 04:26:39 UTC 2023


Hi All,


I have implemented a dbus application using dbus-1 API's on linux
platform and testing profiling on ubuntu machine . Get issue as mentioned
below

Following are the challenges facing

1. I have implemented dbus communication using signaling , dbus
initialization ,  dbus publish  and dbus subscribe
2. When I started testing on the linux ubuntu machine for profiling of
dbus message Transmit from one process and receiver  another process using
dbus signaling, It's working fine for the first 15 minutes , latency time
is on average 500 microSeconds. After 30 minutes,  20 -30 messages are
getting latency of  80 milliseconds . Each message transmitted at 1
millisecond delay
Latency time between transmitting from one process and receiving at another
process is not constant for the same payload message.
3. Is any benchmark for message transmit and receiver timeline (latency
time)

/**
 * Initialize dbus.pass the unique application name as argument. this name will
 * be used to identify the dbus object and path name in the later calls.
 * @param appname unique application identifier name.
 * @return 0 on success error code otherwise.
 */
int dbus_client_init (char *appname)
{
    struct timespec   wait = {0};

    flogd("dbustest.log","%s:%d Entry",__func__,__LINE__);
    DBusError err;
    int rc = 0;
    printf("appname=%s\n",appname);

    // lock mutex
    wait.tv_sec = 0;
    wait.tv_nsec = MS_TO_NS(MILLI_SEC);  // 500 msec

    //pthread_mutex_lock (&init_dbus_mutex);
   if(pthread_mutex_timedlock(&init_dbus_mutex, &wait) == 0)
    {
        flogd("dbustest.log","%s:%d Entry",__func__,__LINE__);
        if (init_dbus_flag == false)
        {
            // initialize the errors
                    dbus_error_init(&err);

        // connect to the system bus and check for errors
        txconn = dbus_bus_get(DBUS_BUS_SESSION, &err);
        if (dbus_error_is_set(&err))
        {
           flogd("dbustest.log","%s:%d txbus: Connection Error (%s)"
,__func__,__LINE__, err.message);
            dbus_error_free(&err);
            pthread_mutex_unlock (&init_dbus_mutex);
            return -1;
        }
        if (NULL == txconn)
        {
            dbus_error_free(&err);
            pthread_mutex_unlock (&init_dbus_mutex);
            return -1;
        }

        // request our name on the bus
        sprintf(txbusname + strlen(txbusname), "%s", appname);
        rc = dbus_bus_request_name(txconn, txbusname,
DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
        if (dbus_error_is_set(&err) || rc == -1)
        {
           flogd("dbustest.log", "%s:%d txbus: Name Error (%s)"
,__func__,__LINE__, err.message);
            dbus_error_free(&err);
            pthread_mutex_unlock (&init_dbus_mutex);
            return -1;
        }

        // connect to the system bus and check for errors
        rxconn = dbus_bus_get(DBUS_BUS_SESSION, &err);
        if (dbus_error_is_set(&err))
        {
           flogd("dbustest.log","%s:%d rxbus: Connection Error (%s)"
,__func__,__LINE__, err.message);
            dbus_error_free(&err);
            pthread_mutex_unlock (&init_dbus_mutex);
            return -1;
        }
        if (NULL == rxconn)
        {
            dbus_error_free(&err);
            pthread_mutex_unlock (&init_dbus_mutex);
            return -1;
        }

        // request our name on the bus
//        sprintf(rxbusname + strlen(rxbusname), "%d", getpid());
        sprintf(rxbusname + strlen(rxbusname), "%s", appname);
        printf("%s\n",rxbusname);
        rc = dbus_bus_request_name(rxconn, rxbusname,
DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
        if (dbus_error_is_set(&err) || rc == -1)
        {
           flogd("dbustest.log", "%s:%d rxbus: Name Error (%s)"
,__func__,__LINE__, err.message);
            dbus_error_free(&err);
            pthread_mutex_unlock (&init_dbus_mutex);
            return -1;
        }

            dbus_error_free(&err);
            init_dbus_flag = true;
        }
        else
        {
            flogv("dbustest.log","%s:%d Dbus was already initialized"
,__func__,__LINE__);
        }

        pthread_mutex_unlock (&init_dbus_mutex);
    }
    else
    {
        flogd("dbustest.log","%s:%d Exit (no errors)",__func__,__LINE__);
        return -1;
    }
    return 0;
}

int dbus_client_publish (char *method, void *payload, int payloadlen)
{
   DBusMessage *msg;
   DBusMessageIter args;
   DBusMessageIter subiter;
   DBusError err;

   int returnValue = -1;
   int SignalRetryCount = 0;
   int SendRetryCount = 0;
   int maxRetries = 3;
   int successConnect = 0;
   int successSend = 0;

   char *pload = (char*) payload;

   if(pload != NULL)
   {
      // initialize the errors
      dbus_error_init (&err);

      while (SignalRetryCount < maxRetries && !successConnect)
      {

         // create a new method call and check for errors
         msg = dbus_message_new_signal (DBUS_OBJECT_NAME,
// object name of the signal
                                        DBUS_INTERFACE_NAME,
// interface name of the signal
                                        method); // name of the signal

         if(NULL == msg)
         {
            dbus_error_free (&err);

            SignalRetryCount++;
         }
         else
         {
            // append arguments
            dbus_message_iter_init_append (msg, &args);


// For appending args, doc says to use the following 3 for container types.
            // We can view the cmd buffer as a char buffer and send
            dbus_message_iter_open_container (&args, DBUS_TYPE_ARRAY, "y"
, &subiter);
            dbus_message_iter_append_fixed_array (&subiter,
DBUS_TYPE_BYTE, &pload, payloadlen);

            dbus_message_iter_close_container (&args, &subiter);

            // We probably don't need a reply

// flogv("dbustest.log","%s:%d dbus_connection_send called",__func__,__LINE__);

            while (SendRetryCount < maxRetries && !successSend)
            {
               if(!dbus_connection_send (txconn, msg, NULL))
               {

//  flogd("dbustest.log","%s:%d Out Of Memory!",__func__,__LINE__);
                  SendRetryCount++;
               }
               else
               {
                  successConnect = 1;
                  successSend = 1;
                  returnValue = 0;
                  break;
               }
            }
         }
      }

      dbus_connection_flush (txconn);

      // free message
      dbus_message_unref (msg);
      dbus_error_free (&err);

      //flogd("dbustest.log","%s:%d Exit (no errors)",__func__,__LINE__);
   }

   // print ( dbus_client_publish, returnValue)

   return returnValue;
}
/* This API subscribes for a particular busname/interface/method. The
callback would
 * be called once the subscribed information is available.
 *Note: subscribe = listen => server bus name (e.g., mqtt.server.cmdhndlr).
 *
 * method:    method name you want to listen to.
 * cb:        callback to be called once the subscribed information is
available.
 * return:    0 on success, -1 on failure.
 */
int dbus_client_subscribe (int num_pairs, ...)
{
    flogd("dbustest.log","%s:%d Entry",__func__,__LINE__);
    int i;
    va_list vargs;
    rx_thread_args_t *args;

    if (num_pairs > 32)
        return -1;

    args = (rx_thread_args_t *) malloc (sizeof(rx_thread_args_t));
    if (args == NULL)
        return -1;

    args->num_pairs = num_pairs;
    va_start(vargs, num_pairs);
    for (i = 0; i < num_pairs; i++)
    {
        char *m = va_arg(vargs, char *);
        sprintf (args->mthod_cb[i].method, "%s", m);
        args->mthod_cb[i].cb = va_arg(vargs, void *);
    }
    va_end(vargs);

    if
 (pthread_create (&dbus_client_rx_thread, NULL, dbus_client_receive_thread, (
void *)args) != 0)
    {
        floge("dbustest.log",
"%s:%d Client Error: Rx Thread creation failed; errno = %d (%s)"
,__func__,__LINE__, errno, strerror(errno));
        return -1;
    }


/* TODO: Detach dbus_client_receive_thread. (or exit self in the created thread)
     * Cannot be joined here as it will block the client process.
     */
    flogd("dbustest.log","%s:%d exit",__func__,__LINE__);
    return 0;
}

Please correct me, if anything is missing in my implementation.


Regards,
Deepak Jewargi
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/dbus/attachments/20230523/21f8fd61/attachment-0001.htm>


More information about the dbus mailing list