Error "free(): double free detected in tcache 2" when querying info from libqmi via Python API

Martin Maurer martin.maurer at mmeacs.de
Thu Sep 14 15:39:35 UTC 2023


Hello,

I am trying to learn how to use the Python API of libqmi.
I extended the simple-tester-python.py to press a key and query some 
information.
When trying to get the revision, I get the correct revision, but 
afterwards I get the error message “free(): double free detected in 
tcache 2”.
I tried the same with getting the model, and same behaviour: Model can 
be printed and is correct, but then same error message.

When I comment out outputA.get_revision() as well as outputB.get_model() 
(and not print the content, of course),
the error message disappears.
I saw in documentation ( 
https://lazka.github.io/pgi-docs/Qmi-1.0/classes/ClientDms.html#Qmi.ClientDms.get_revision_finish 
)
that I shall: “The returned value should be freed with 
Qmi.MessageDmsGetModelOutput.unref().”
But this does not seem to make a difference.

Can someone give me a hint, how I can debug and fix this?
Where can I find the source code (python and/or C), which is executed, 
when a Python function like get_revision() or get_model() is called,
till something is executed with/in libqmi? Or do I then directly need to 
debug inside libqmi?

PS: I have attached the complete python source code of my current 
testing. Extension renamed from *.py to *.txt to avoid that mail servers 
perhaps filter the file. Extracts below in the email.
Keypress like ‘r’ are detected to execute the get revision action.
And ‘m’ to get the radio module model. All just for testing, far from 
cleaned up code!

I prepared the Linux environment with these commands (on a Raspberry Pi 
4 with latest Raspian OS from May 2023)

(only one time needed)

sudo apt install python3-gi

sudo apt install libqmi-glib-dev

I execute it like the following:

sudo python3 connection-setup-bugreport.py /dev/cdc-wdm0

I assume does not matter, but I use a Quectel module connected via USB.

Many thanks!

Best regards,
Martin

Output of my program (once pressing key "r", then restarting program and 
pressing key "m"):

$ sudo python3 connection-setup.py /dev/cdc-wdm0
new_ready
wwan0
open_ready
allocate_client_DMS_ready
allocate_client_DMS_ready: successful
CID 45
allocate_client_WDS_ready
allocate_client_WDS_ready: successful
<Qmi.ClientWds object at 0x7f93b22a00 (QmiClientWds at 0x3b904790)>
CID 46
Call function r
get_revision_ready
Hallo A
2
Hallo B
revision:              RM520NGLAAR01A07M4G
free(): double free detected in tcache 2
Aborted
$ sudo python3 connection-setup.py /dev/cdc-wdm0
new_ready
wwan0
open_ready
allocate_client_DMS_ready
allocate_client_DMS_ready: successful
CID 46
allocate_client_WDS_ready
allocate_client_WDS_ready: successful
<Qmi.ClientWds object at 0x7f94076e00 (QmiClientWds at 0x4dd9790)>
CID 47
Call function m
get_model_ready
Hallo A
2
Hallo B
model:                 RM520N-GL
free(): double free detected in tcache 2
Aborted
$


def get_revision_ready(qmiclient,result,qmidev):
     print("get_revision_ready")

     try:
         outputA = qmiclient.get_revision_finish(result)
         errorOccured = outputA.get_result()

         available, revision = outputA.get_revision()

         # outputA.unref()

         print("Hallo A")

         print(sys.getrefcount(outputA))

         print("Hallo B")

         if available == True:
             print("revision:              %s" % revision)
         else:
             print("revision:              -> error getting capabilities")

     except GLib.GError as error:
         sys.stderr.write("Couldn't query revision: %s\n" % error.message)

     # output.unref()

     # release_client(qmidev, qmiclient)


def get_model_ready(qmiclient,result,qmidev):
     print("get_model_ready")

     try:
         outputB = qmiclient.get_model_finish(result)
         errorOccured = outputB.get_result()

         available, model = outputB.get_model()

         # outputB.unref()

         print("Hallo A")

         print(sys.getrefcount(outputB))

         print("Hallo B")

         if available == True:
             print("model:                 %s" % model)
         else:
             print("model:                 -> error getting capabilities")

     except GLib.GError as error:
         sys.stderr.write("Couldn't query model: %s\n" % error.message)

     # output.unref()

     # release_client(qmidev, qmiclient)


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.freedesktop.org/archives/libqmi-devel/attachments/20230914/9cea6a86/attachment-0001.htm>
-------------- next part --------------
#!/usr/bin/env python3
# -*- Mode: python; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright (C) 2020 Aleksander Morgado <aleksander at aleksander.es>
#

import sys, signal, gi

gi.require_version('Qmi', '1.0')
from gi.repository import GLib, Gio, Qmi

# import keyboard

import termios
import atexit
from select import select

# import readchar

# def on_key_event(event):
#     print("on_key_event")
#     if event.event_type == keyboard.KEY_DOWN:
#         print("Enter key pressed")

main_loop = None

qmidev = None

qmiclient_WDS = None
qmiclient_DMS = None



def signal_handler(data):
    print("signal_handler")
    main_loop.quit()


def device_close_ready(qmidev,result,user_data=None):
    print("device_close_ready")
    try:
        qmidev.close_finish(result)
    except GLib.GError as error:
        sys.stderr.write("Couldn't close QMI device: %s\n" % error.message)
    print("Query done")
    # main_loop.quit()


def device_close(qmidev):
    print("device_close")
    qmidev.close_async(10, None, device_close_ready, None)


def release_client_ready(qmidev,result,user_data=None):
    print("release_client_ready")
    try:
        qmidev.release_client_finish(result)
    except GLib.GError as error:
        sys.stderr.write("Couldn't release QMI client: %s\n" % error.message)
    device_close(qmidev)


def release_client(qmidev,qmiclient):
    print("release_client")
    qmidev.release_client(qmiclient, Qmi.DeviceReleaseClientFlags.RELEASE_CID, 10, None, release_client_ready, None)


def get_ids_ready(qmiclient,result,qmidev):
    print("get_ids_ready")
    try:
        output = qmiclient.get_ids_finish(result)
        output.get_result()
    except GLib.GError as error:
        sys.stderr.write("Couldn't query device ids: %s\n" % error.message)
        release_client(qmidev, qmiclient)
        return

    try:
        found, imei = output.get_imei()
        if found:
            print("imei:                  %s" % imei)
        else: 
            print("imei:                  -> error getting imei")
    except:
        pass

    try:
        found, imei_software_version = output.get_imei_software_version()
        if found:
            print("imei software version: %s" % imei_software_version)
        else:
            print("imei software version: -> error getting imei_software_version")
    except:
        pass

    try:
        found, meid = output.get_meid()
        if found:
            print("meid:                  %s" % meid)
        else:
            print("meid:                  -> error getting meid")
    except:
        pass

    try:
        found, esn = output.get_esn()
        if found:
            print("esn:                   %s" % esn)
        else:
            print("esn:                   -> error getting esn")
    except:
        pass

    # release_client(qmidev, qmiclient)


def get_capabilities_ready(qmiclient,result,qmidev):
    print("get_capabilities_ready")
    try:
        output = qmiclient.get_capabilities_finish(result)
        output.get_result()

        found, maxtxrate, maxrxrate, dataservicecaps, simcaps, radioifaces = output.get_info()
        if found:
            print("max tx channel rate:   %u" % maxtxrate)
            print("max rx channel rate:   %u" % maxrxrate)
            print("data service:          %s" % Qmi.DmsDataServiceCapability.get_string(dataservicecaps))
            print("sim:                   %s" % Qmi.DmsSimCapability.get_string(simcaps))
            networks = ""
            for radioiface in radioifaces:
                if networks != "":
                    networks += ", "
                networks += Qmi.DmsRadioInterface.get_string(radioiface)
            print("networks:              %s" % networks)
        else:
            print("max tx channel rate:   -> error getting capabilities")
            print("max rx channel rate:   -> error getting capabilities")
            print("data service:          -> error getting capabilities")
            print("sim:                   -> error getting capabilities")
            print("networks:              -> error getting capabilities")

    except GLib.GError as error:
        sys.stderr.write("Couldn't query device capabilities: %s\n" % error.message)

    qmiclient.get_ids(None, 10, None, get_ids_ready, qmidev)


def get_revision_ready(qmiclient,result,qmidev):
    print("get_revision_ready")
    
    try:
        outputA = qmiclient.get_revision_finish(result)
        errorOccured = outputA.get_result()
        
        available, revision = outputA.get_revision()
        
        # outputA.unref()
        
        print("Hallo A")
        
        print(sys.getrefcount(outputA))

        print("Hallo B")
        
        if available == True:
            print("revision:              %s" % revision)
        else:
            print("revision:              -> error getting capabilities")

    except GLib.GError as error:
        sys.stderr.write("Couldn't query revision: %s\n" % error.message)
        
    # output.unref()
    
    # release_client(qmidev, qmiclient)
    
    
def get_model_ready(qmiclient,result,qmidev):
    print("get_model_ready")
    
    try:
        outputB = qmiclient.get_model_finish(result)
        errorOccured = outputB.get_result()
        
        available, model = outputB.get_model()
        
        # outputB.unref()
        
        print("Hallo A")
        
        print(sys.getrefcount(outputB))

        print("Hallo B")
        
        if available == True:
            print("model:                 %s" % model)
        else:
            print("model:                 -> error getting capabilities")

    except GLib.GError as error:
        sys.stderr.write("Couldn't query model: %s\n" % error.message)
        
    # output.unref()
    
    # release_client(qmidev, qmiclient)


def allocate_client_DMS_ready(qmidev,result,user_data=None):
    global qmiclient_DMS
    
    print("allocate_client_DMS_ready")
    try:
        qmiclient_DMS = qmidev.allocate_client_finish(result)
    except GLib.GError as error:
        sys.stderr.write("Couldn't allocate QMI client DMS: %s\n" % error.message)
        device_close(qmidev)
        return

    print("allocate_client_DMS_ready: successful")
    
    print("CID " + str(qmiclient_DMS.get_cid()))
    
    # qmiclient_DMS.get_revision(None, 10, None, get_revision_ready, qmidev)
    
    # qmidev.release_client(qmiclient_DMS, Qmi.DeviceReleaseClientFlags.RELEASE_CID, 10, None, release_client_ready, None)
    
    # qmiclient.get_capabilities(None, 10, None, get_capabilities_ready, qmidev)
    

def allocate_client_WDS_ready(qmidev,result,user_data=None):
    global qmiclient_WDS
    
    print("allocate_client_WDS_ready")
    try:
        qmiclient_WDS = qmidev.allocate_client_finish(result)
    except GLib.GError as error:
        sys.stderr.write("Couldn't allocate QMI client WDS: %s\n" % error.message)
        device_close(qmidev)
        return

    print("allocate_client_WDS_ready: successful")
    
    print(qmiclient_WDS)
    
    print("CID " + str(qmiclient_WDS.get_cid()))
    
    # print("allocate_client_WDS_ready: successful Pos 2")
    
    
    # qmidev.release_client(qmiclient_WDS, Qmi.DeviceReleaseClientFlags.RELEASE_CID, 10, None, release_client_ready, None)
    
    # qmiclient.get_capabilities(None, 10, None, get_capabilities_ready, qmidev)
    
    # release_client(qmidev, qmiclient_WDS)


def open_ready(qmidev,result,user_data=None):
    print("open_ready")
    try:
        qmidev.open_finish(result)
    except GLib.GError as error:
        sys.stderr.write("Couldn't open QMI device: %s\n" % error.message)
        main_loop.quit()
        return

    qmidev.allocate_client(Qmi.Service.DMS, Qmi.CID_NONE, 10, None, allocate_client_DMS_ready, None)
    
    qmidev.allocate_client(Qmi.Service.WDS, Qmi.CID_NONE, 10, None, allocate_client_WDS_ready, None)


def new_ready(unused,result,user_data=None):
    global qmidev
    
    print("new_ready")
    try:
        qmidev = Qmi.Device.new_finish(result)
    except GLib.GError as error:
        sys.stderr.write("Couldn't create QMI device: %s\n" % error.message)
        main_loop.quit()
        return
        
    print(qmidev.get_wwan_iface())

    qmidev.open(Qmi.DeviceOpenFlags.PROXY | Qmi.DeviceOpenFlags.AUTO, 10, None, open_ready, None)


def testme():
    global qmiclient_DMS
    global qmiclient_WDS
    global qmidev

    GLib.timeout_add(250, testme)
    # print("Callback");
    
    dr,dw,de = select([sys.stdin], [], [], 0)
    if dr != []:
        # print("A")
        # print(repr(readchar.readchar()))
        myinput = sys.stdin.read(1)
        if myinput == "q":
            main_loop.quit()
        if myinput == "a":
            print("Call function a")
            # Create Mbim device asynchronously
            file = Gio.File.new_for_path(sys.argv[1])
            Qmi.Device.new (file, None, new_ready, None)
        if myinput == "b":
            print("Call function b")
            qmiclient_DMS.get_capabilities(None, 10, None, get_capabilities_ready, qmidev)
        if myinput == "r":
            print("Call function r")
            qmiclient_DMS.get_revision(None, 10, None, get_revision_ready, qmidev)
        if myinput == "m":
            print("Call function m")
            qmiclient_DMS.get_model(None, 10, None, get_model_ready, qmidev)
        if myinput == "i":
            print("CID(DMS) " + str(qmiclient_DMS.get_cid()))
            print("CID(WDS) " + str(qmiclient_WDS.get_cid()))
        if myinput == "?":
            print("<a> to query information")
            print("<b> to do nothing")
            print("<q> to quit")
    

# print('Reading a char:')
# print(repr(readchar.readchar()))
# print('Reading a key:')
# print(repr(readchar.readkey()))




def set_normal_term():
    ''' Resets to normal terminal.  On Windows this is a no-op.
    '''
    termios.tcsetattr(fd, termios.TCSAFLUSH, old_term)


if __name__ == "__main__":

    # global qmiclient_DMS
    # global qmiclient_WDS

    # Process input arguments
    if len(sys.argv) != 2:
        sys.stderr.write('error: wrong number of arguments\n')
        sys.stdout.write('usage: simple-tester-python <DEVICE>\n')
        sys.exit(1)
        
    # keyboard.on_press_key("enter", on_enter_key)
    # keyboard.wait("esc")  # Wait for the "Esc" key to exit
    
    # keyboard.on_press(on_key_event)
    # keyboard.on_release(on_key_event)
    
    # Glib::IO->add_watch(fileno 'STDIN', [qw/in/], \&watch_callback);
    
    # Save the terminal settings
    keyboard_handling = True
    
    if keyboard_handling == True:
        fd = sys.stdin.fileno()
        new_term = termios.tcgetattr(fd)
        old_term = termios.tcgetattr(fd)

        # New terminal setting unbuffered
        new_term[3] = (new_term[3] & ~termios.ICANON & ~termios.ECHO)
        termios.tcsetattr(fd, termios.TCSAFLUSH, new_term)

        # Support normal-terminal reset at exit
        atexit.register(set_normal_term)
        
        GLib.timeout_add(250, testme)

    # Create Mbim device asynchronously
    file = Gio.File.new_for_path(sys.argv[1])
    Qmi.Device.new (file, None, new_ready, None)

    # Main loop
    main_loop = GLib.MainLoop()
    GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, signal_handler, None)
    GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, signal_handler, None)
    try:
        main_loop.run()
    except KeyboardInterrupt:
        pass
        
    print("CID(DMS) " + str(qmiclient_DMS.get_cid()))
    print("CID(WDS) " + str(qmiclient_WDS.get_cid()))
        
    if qmiclient_DMS != None:
        print("qmiclient_DMS != None")
        qmidev.release_client(qmiclient_DMS, Qmi.DeviceReleaseClientFlags.RELEASE_CID, 10, None, release_client_ready, None)
        
    if qmiclient_WDS != None:
        print("qmiclient_WDS != None")
        qmidev.release_client(qmiclient_WDS, Qmi.DeviceReleaseClientFlags.RELEASE_CID, 10, None, release_client_ready, None)

    if qmidev != None:
        print("qmidev != None")
        device_close(qmidev)


More information about the libqmi-devel mailing list