1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# vim:fileencoding=utf-8
# (c) 2011-2012 Michał Górny <mgorny@gentoo.org>
# Released under the terms of the 2-clause BSD license.
import dbus, os, signal, tempfile
from dbus.mainloop.glib import DBusGMainLoop
dbus_interface_name = 'org.gentoo.pmstestsuite'
dbus_bus_name = dbus_interface_name
dbus_object_prefix = '/org/gentoo/pmstestsuite'
dbus_config = """<?xml version="1.0"?>
<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN"
"http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
<busconfig>
<type>session</type>
<listen>unix:tmpdir=/tmp</listen>
<user>%s</user>
<policy context="default">
<allow user="%s"/>
<allow send_destination="*" eavesdrop="true"/>
<allow eavesdrop="true"/>
<allow own="*"/>
</policy>
</busconfig>"""
class DBusHandler(object):
""" A class handling all D-Bus interaction for PMS Test Suite. """
def start_dbus(self, uid):
tmpf = tempfile.NamedTemporaryFile('w')
tmpf.write(dbus_config % (uid, os.getuid()))
tmpf.flush()
(read_fd, write_fd) = os.pipe()
self.dbus_pid = os.fork()
if self.dbus_pid == 0:
os.close(read_fd)
os.execlp('dbus-daemon', 'dbus-daemon', '--nofork',
'--config-file=%s' % tmpf.name,
'--print-address=%d' % write_fd)
else:
addr = os.read(read_fd, 1024)
os.close(write_fd)
tmpf.close()
self.bus_address = addr.strip()
def terminate(self):
os.kill(self.dbus_pid, signal.SIGTERM)
def __init__(self, uid):
""" Initialize DBusHandler. Add it to main GLib loop. """
DBusGMainLoop(set_as_default=True)
self.start_dbus(uid)
os.environ['DBUS_SESSION_BUS_ADDRESS'] = self.bus_address
self.bus = dbus.SessionBus()
self.busname = dbus.service.BusName(dbus_bus_name, self.bus)
|