[PATCH 17/17] test/test-p2p.py: Added. Test "peer-to-peer" connections.

John (J5) Palmieri johnp at redhat.com
Mon Apr 30 15:12:11 PDT 2007


great!!!

On Mon, 2007-04-30 at 13:38 +0100, Simon McVittie wrote:
> (Actually tested by connecting to the bus daemon, because I haven't implemented
> a Python binding for DBusServer yet.)
> 
> diff --git a/test/Makefile.am b/test/Makefile.am
> index 58e4da8..ef2d529 100644
> --- a/test/Makefile.am
> +++ b/test/Makefile.am
> @@ -5,6 +5,7 @@ EXTRA_DIST = \
>  	     run-test.sh \
>  	     run-with-tmp-session-bus.sh \
>  	     test-client.py \
> +	     test-p2p.py \
>  	     test-service.py \
>  	     test-signals.py \
>  	     test-standalone.py \
> diff --git a/test/run-test.sh b/test/run-test.sh
> index 6058afa..2661d3b 100755
> --- a/test/run-test.sh
> +++ b/test/run-test.sh
> @@ -89,6 +89,9 @@ $PYTHON "$DBUS_TOP_SRCDIR"/test/test-client.py || die "test-client.py failed"
>  echo "running test-signals.py"
>  $PYTHON "$DBUS_TOP_SRCDIR"/test/test-signals.py || die "test-signals.py failed"
>  
> +echo "running test-p2p.py"
> +$PYTHON "$DBUS_TOP_SRCDIR"/test/test-p2p.py || die "... failed"
> +
>  rm -f "$DBUS_TOP_BUILDDIR"/test/test-service.log
>  rm -f "$DBUS_TOP_BUILDDIR"/test/cross-client.log
>  rm -f "$DBUS_TOP_BUILDDIR"/test/cross-server.log
> diff --git a/test/test-p2p.py b/test/test-p2p.py
> new file mode 100644
> index 0000000..11b44e8
> --- /dev/null
> +++ b/test/test-p2p.py
> @@ -0,0 +1,91 @@
> +#!/usr/bin/env python
> +
> +# Copyright (C) 2004 Red Hat Inc. <http://www.redhat.com/>
> +# Copyright (C) 2005, 2006 Collabora Ltd. <http://www.collabora.co.uk/>
> +#
> +# Licensed under the Academic Free License version 2.1
> +#
> +# This program is free software; you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation; either version 2 of the License, or
> +# (at your option) any later version.
> +#
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +# GNU General Public License for more details.
> +#
> +# You should have received a copy of the GNU General Public License
> +# along with this program; if not, write to the Free Software
> +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
> +
> +
> +import sys
> +import os
> +import unittest
> +import time
> +import logging
> +import weakref
> +
> +builddir = os.path.normpath(os.environ["DBUS_TOP_BUILDDIR"])
> +pydir = os.path.normpath(os.environ["DBUS_TOP_SRCDIR"])
> +
> +import dbus
> +import _dbus_bindings
> +import gobject
> +import dbus.glib
> +import dbus.service
> +
> +
> +logging.basicConfig()
> +logging.getLogger().setLevel(1)
> +
> +
> +NAME = "org.freedesktop.DBus.TestSuitePythonService"
> +IFACE = "org.freedesktop.DBus.TestSuiteInterface"
> +OBJECT = "/org/freedesktop/DBus/TestSuitePythonObject"
> +
> +class TestDBusBindings(unittest.TestCase):
> +    # This test case relies on the test service already having been activated.
> +
> +    def get_conn_and_unique(self):
> +        # since I haven't implemented servers yet, I'll use the bus daemon
> +        # as our peer - note that there's no name tracking because we're not
> +        # using dbus.bus.BusConnection!
> +        conn = dbus.connection.Connection(
> +                os.environ['DBUS_SESSION_BUS_ADDRESS'])
> +        unique = conn.call_blocking('org.freedesktop.DBus',
> +                                    '/org/freedesktop/DBus',
> +                                    'org.freedesktop.DBus', 'Hello',
> +                                    '', (), utf8_strings=True)
> +        self.assert_(unique.__class__ == dbus.UTF8String, repr(unique))
> +        self.assert_(unique.startswith(':'), unique)
> +        conn.set_unique_name(unique)
> +        return conn, unique
> +
> +    def testCall(self):
> +        conn, unique = self.get_conn_and_unique()
> +        ret = conn.call_blocking(NAME, OBJECT, IFACE, 'Echo', 'v', ('V',))
> +        self.assertEquals(ret, 'V')
> +
> +    def testCallThroughProxy(self):
> +        conn, unique = self.get_conn_and_unique()
> +        proxy = conn.get_object(NAME, OBJECT)
> +        iface = dbus.Interface(proxy, IFACE)
> +        ret = iface.Echo('V')
> +        self.assertEquals(ret, 'V')
> +
> +    def testSetUniqueName(self):
> +        conn, unique = self.get_conn_and_unique()
> +        ret = conn.call_blocking(NAME, OBJECT, IFACE,
> +                                 'MethodExtraInfoKeywords', '', (),
> +                                 utf8_strings=True)
> +        self.assertEquals(ret, (unique, OBJECT, NAME,
> +                                'dbus.lowlevel.MethodCallMessage'))
> +
> +
> +if __name__ == '__main__':
> +    gobject.threads_init()
> +    dbus.glib.init_threads()
> +
> +    unittest.main()
> _______________________________________________
> dbus mailing list
> dbus at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/dbus



More information about the dbus mailing list