dbus message queue latency time is not constant for same data payload
deepak jewargi
djewargi at gmail.com
Tue May 23 04:26:39 UTC 2023
Hi All,
I have implemented a dbus application using dbus-1 API's on linux
platform and testing profiling on ubuntu machine . Get issue as mentioned
below
Following are the challenges facing
1. I have implemented dbus communication using signaling , dbus
initialization , dbus publish and dbus subscribe
2. When I started testing on the linux ubuntu machine for profiling of
dbus message Transmit from one process and receiver another process using
dbus signaling, It's working fine for the first 15 minutes , latency time
is on average 500 microSeconds. After 30 minutes, 20 -30 messages are
getting latency of 80 milliseconds . Each message transmitted at 1
millisecond delay
Latency time between transmitting from one process and receiving at another
process is not constant for the same payload message.
3. Is any benchmark for message transmit and receiver timeline (latency
time)
/**
* Initialize dbus.pass the unique application name as argument. this name will
* be used to identify the dbus object and path name in the later calls.
* @param appname unique application identifier name.
* @return 0 on success error code otherwise.
*/
int dbus_client_init (char *appname)
{
struct timespec wait = {0};
flogd("dbustest.log","%s:%d Entry",__func__,__LINE__);
DBusError err;
int rc = 0;
printf("appname=%s\n",appname);
// lock mutex
wait.tv_sec = 0;
wait.tv_nsec = MS_TO_NS(MILLI_SEC); // 500 msec
//pthread_mutex_lock (&init_dbus_mutex);
if(pthread_mutex_timedlock(&init_dbus_mutex, &wait) == 0)
{
flogd("dbustest.log","%s:%d Entry",__func__,__LINE__);
if (init_dbus_flag == false)
{
// initialize the errors
dbus_error_init(&err);
// connect to the system bus and check for errors
txconn = dbus_bus_get(DBUS_BUS_SESSION, &err);
if (dbus_error_is_set(&err))
{
flogd("dbustest.log","%s:%d txbus: Connection Error (%s)"
,__func__,__LINE__, err.message);
dbus_error_free(&err);
pthread_mutex_unlock (&init_dbus_mutex);
return -1;
}
if (NULL == txconn)
{
dbus_error_free(&err);
pthread_mutex_unlock (&init_dbus_mutex);
return -1;
}
// request our name on the bus
sprintf(txbusname + strlen(txbusname), "%s", appname);
rc = dbus_bus_request_name(txconn, txbusname,
DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
if (dbus_error_is_set(&err) || rc == -1)
{
flogd("dbustest.log", "%s:%d txbus: Name Error (%s)"
,__func__,__LINE__, err.message);
dbus_error_free(&err);
pthread_mutex_unlock (&init_dbus_mutex);
return -1;
}
// connect to the system bus and check for errors
rxconn = dbus_bus_get(DBUS_BUS_SESSION, &err);
if (dbus_error_is_set(&err))
{
flogd("dbustest.log","%s:%d rxbus: Connection Error (%s)"
,__func__,__LINE__, err.message);
dbus_error_free(&err);
pthread_mutex_unlock (&init_dbus_mutex);
return -1;
}
if (NULL == rxconn)
{
dbus_error_free(&err);
pthread_mutex_unlock (&init_dbus_mutex);
return -1;
}
// request our name on the bus
// sprintf(rxbusname + strlen(rxbusname), "%d", getpid());
sprintf(rxbusname + strlen(rxbusname), "%s", appname);
printf("%s\n",rxbusname);
rc = dbus_bus_request_name(rxconn, rxbusname,
DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
if (dbus_error_is_set(&err) || rc == -1)
{
flogd("dbustest.log", "%s:%d rxbus: Name Error (%s)"
,__func__,__LINE__, err.message);
dbus_error_free(&err);
pthread_mutex_unlock (&init_dbus_mutex);
return -1;
}
dbus_error_free(&err);
init_dbus_flag = true;
}
else
{
flogv("dbustest.log","%s:%d Dbus was already initialized"
,__func__,__LINE__);
}
pthread_mutex_unlock (&init_dbus_mutex);
}
else
{
flogd("dbustest.log","%s:%d Exit (no errors)",__func__,__LINE__);
return -1;
}
return 0;
}
int dbus_client_publish (char *method, void *payload, int payloadlen)
{
DBusMessage *msg;
DBusMessageIter args;
DBusMessageIter subiter;
DBusError err;
int returnValue = -1;
int SignalRetryCount = 0;
int SendRetryCount = 0;
int maxRetries = 3;
int successConnect = 0;
int successSend = 0;
char *pload = (char*) payload;
if(pload != NULL)
{
// initialize the errors
dbus_error_init (&err);
while (SignalRetryCount < maxRetries && !successConnect)
{
// create a new method call and check for errors
msg = dbus_message_new_signal (DBUS_OBJECT_NAME,
// object name of the signal
DBUS_INTERFACE_NAME,
// interface name of the signal
method); // name of the signal
if(NULL == msg)
{
dbus_error_free (&err);
SignalRetryCount++;
}
else
{
// append arguments
dbus_message_iter_init_append (msg, &args);
// For appending args, doc says to use the following 3 for container types.
// We can view the cmd buffer as a char buffer and send
dbus_message_iter_open_container (&args, DBUS_TYPE_ARRAY, "y"
, &subiter);
dbus_message_iter_append_fixed_array (&subiter,
DBUS_TYPE_BYTE, &pload, payloadlen);
dbus_message_iter_close_container (&args, &subiter);
// We probably don't need a reply
// flogv("dbustest.log","%s:%d dbus_connection_send called",__func__,__LINE__);
while (SendRetryCount < maxRetries && !successSend)
{
if(!dbus_connection_send (txconn, msg, NULL))
{
// flogd("dbustest.log","%s:%d Out Of Memory!",__func__,__LINE__);
SendRetryCount++;
}
else
{
successConnect = 1;
successSend = 1;
returnValue = 0;
break;
}
}
}
}
dbus_connection_flush (txconn);
// free message
dbus_message_unref (msg);
dbus_error_free (&err);
//flogd("dbustest.log","%s:%d Exit (no errors)",__func__,__LINE__);
}
// print ( dbus_client_publish, returnValue)
return returnValue;
}
/* This API subscribes for a particular busname/interface/method. The
callback would
* be called once the subscribed information is available.
*Note: subscribe = listen => server bus name (e.g., mqtt.server.cmdhndlr).
*
* method: method name you want to listen to.
* cb: callback to be called once the subscribed information is
available.
* return: 0 on success, -1 on failure.
*/
int dbus_client_subscribe (int num_pairs, ...)
{
flogd("dbustest.log","%s:%d Entry",__func__,__LINE__);
int i;
va_list vargs;
rx_thread_args_t *args;
if (num_pairs > 32)
return -1;
args = (rx_thread_args_t *) malloc (sizeof(rx_thread_args_t));
if (args == NULL)
return -1;
args->num_pairs = num_pairs;
va_start(vargs, num_pairs);
for (i = 0; i < num_pairs; i++)
{
char *m = va_arg(vargs, char *);
sprintf (args->mthod_cb[i].method, "%s", m);
args->mthod_cb[i].cb = va_arg(vargs, void *);
}
va_end(vargs);
if
(pthread_create (&dbus_client_rx_thread, NULL, dbus_client_receive_thread, (
void *)args) != 0)
{
floge("dbustest.log",
"%s:%d Client Error: Rx Thread creation failed; errno = %d (%s)"
,__func__,__LINE__, errno, strerror(errno));
return -1;
}
/* TODO: Detach dbus_client_receive_thread. (or exit self in the created thread)
* Cannot be joined here as it will block the client process.
*/
flogd("dbustest.log","%s:%d exit",__func__,__LINE__);
return 0;
}
Please correct me, if anything is missing in my implementation.
Regards,
Deepak Jewargi
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/dbus/attachments/20230523/21f8fd61/attachment-0001.htm>
More information about the dbus
mailing list