mirror of
https://github.com/mbusb/multibootusb
synced 2024-11-07 03:20:23 +00:00
224 lines
6.8 KiB
Python
224 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
|
|
# Name: udisks.py
|
|
# Purpose: Module to mount unmount and eject using dbus and udisk
|
|
# Authors: Original author is Kovid Goyal <kovid@kovidgoyal.net> and python3
|
|
# supporte by Sundar for multibootusb project
|
|
# Licence: 'GPL v3' as per original Licence
|
|
|
|
__license__ = 'GPL v3'
|
|
__copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>'
|
|
__docformat__ = 'restructuredtext en'
|
|
|
|
# from __future__ import print_function
|
|
import os, re
|
|
|
|
|
|
def node_mountpoint(node):
|
|
|
|
def de_mangle(raw):
|
|
return raw.replace('\\040', ' ').replace('\\011', '\t').replace('\\012',
|
|
'\n').replace('\\0134', '\\')
|
|
|
|
for line in open('/proc/mounts').readlines():
|
|
line = line.split()
|
|
if line[0] == node:
|
|
return de_mangle(line[1])
|
|
return None
|
|
|
|
|
|
class NoUDisks1(Exception):
|
|
pass
|
|
|
|
|
|
class UDisks(object):
|
|
|
|
def __init__(self):
|
|
import dbus
|
|
self.bus = dbus.SystemBus()
|
|
try:
|
|
self.main = dbus.Interface(self.bus.get_object('org.freedesktop.UDisks',
|
|
'/org/freedesktop/UDisks'), 'org.freedesktop.UDisks')
|
|
except dbus.exceptions.DBusException as e:
|
|
if getattr(e, '_dbus_error_name', None) == 'org.freedesktop.DBus.Error.ServiceUnknown':
|
|
raise NoUDisks1()
|
|
raise
|
|
|
|
def device(self, device_node_path):
|
|
import dbus
|
|
devpath = self.main.FindDeviceByDeviceFile(device_node_path)
|
|
return dbus.Interface(self.bus.get_object('org.freedesktop.UDisks',
|
|
devpath), 'org.freedesktop.UDisks.Device')
|
|
|
|
def mount(self, device_node_path):
|
|
d = self.device(device_node_path)
|
|
try:
|
|
return str(d.FilesystemMount('',
|
|
['auth_no_user_interaction', 'rw', 'noexec', 'nosuid',
|
|
'nodev', 'uid=%d'%os.geteuid(), 'gid=%d'%os.getegid()]))
|
|
except:
|
|
# May be already mounted, check
|
|
mp = node_mountpoint(str(device_node_path))
|
|
if mp is None:
|
|
raise
|
|
return mp
|
|
|
|
def unmount(self, device_node_path):
|
|
d = self.device(device_node_path)
|
|
d.FilesystemUnmount(['force'])
|
|
|
|
def eject(self, device_node_path):
|
|
parent = device_node_path
|
|
while parent[-1] in '0123456789':
|
|
parent = parent[:-1]
|
|
d = self.device(parent)
|
|
d.DriveEject([])
|
|
|
|
|
|
class NoUDisks2(Exception):
|
|
pass
|
|
|
|
|
|
class UDisks2(object):
|
|
|
|
BLOCK = 'org.freedesktop.UDisks2.Block'
|
|
FILESYSTEM = 'org.freedesktop.UDisks2.Filesystem'
|
|
DRIVE = 'org.freedesktop.UDisks2.Drive'
|
|
|
|
def __init__(self):
|
|
import dbus
|
|
self.bus = dbus.SystemBus()
|
|
try:
|
|
self.bus.get_object('org.freedesktop.UDisks2',
|
|
'/org/freedesktop/UDisks2')
|
|
except dbus.exceptions.DBusException as e:
|
|
if getattr(e, '_dbus_error_name', None) == 'org.freedesktop.DBus.Error.ServiceUnknown':
|
|
raise NoUDisks2()
|
|
raise
|
|
|
|
def device(self, device_node_path):
|
|
device_node_path = os.path.realpath(device_node_path)
|
|
devname = device_node_path.split('/')[-1]
|
|
|
|
# First we try a direct object path
|
|
bd = self.bus.get_object('org.freedesktop.UDisks2',
|
|
'/org/freedesktop/UDisks2/block_devices/%s'%devname)
|
|
try:
|
|
device = bd.Get(self.BLOCK, 'Device',
|
|
dbus_interface='org.freedesktop.DBus.Properties')
|
|
device = bytearray(device).replace(b'\x00', b'').decode('utf-8')
|
|
except:
|
|
device = None
|
|
|
|
if device == device_node_path:
|
|
return bd
|
|
|
|
# Enumerate all devices known to UDisks
|
|
devs = self.bus.get_object('org.freedesktop.UDisks2',
|
|
'/org/freedesktop/UDisks2/block_devices')
|
|
xml = devs.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable')
|
|
for dev in re.finditer(r'name=[\'"](.+?)[\'"]', type('')(xml)):
|
|
bd = self.bus.get_object('org.freedesktop.UDisks2',
|
|
'/org/freedesktop/UDisks2/block_devices/%s2'%dev.group(1))
|
|
try:
|
|
device = bd.Get(self.BLOCK, 'Device',
|
|
dbus_interface='org.freedesktop.DBus.Properties')
|
|
device = bytearray(device).replace(b'\x00', b'').decode('utf-8')
|
|
except:
|
|
device = None
|
|
if device == device_node_path:
|
|
return bd
|
|
|
|
raise ValueError('%r not known to UDisks2'%device_node_path)
|
|
|
|
def mount(self, device_node_path):
|
|
d = self.device(device_node_path)
|
|
mount_options = ['rw', 'noexec', 'nosuid',
|
|
'nodev', 'uid=%d'%os.geteuid(), 'gid=%d'%os.getegid()]
|
|
try:
|
|
return str(d.Mount(
|
|
{
|
|
'auth.no_user_interaction':True,
|
|
'options':','.join(mount_options)
|
|
},
|
|
dbus_interface=self.FILESYSTEM))
|
|
except:
|
|
# May be already mounted, check
|
|
mp = node_mountpoint(str(device_node_path))
|
|
if mp is None:
|
|
raise
|
|
return mp
|
|
|
|
def unmount(self, device_node_path):
|
|
d = self.device(device_node_path)
|
|
d.Unmount({'force':True, 'auth.no_user_interaction':True},
|
|
dbus_interface=self.FILESYSTEM)
|
|
|
|
def drive_for_device(self, device):
|
|
drive = device.Get(self.BLOCK, 'Drive',
|
|
dbus_interface='org.freedesktop.DBus.Properties')
|
|
return self.bus.get_object('org.freedesktop.UDisks2', drive)
|
|
|
|
def eject(self, device_node_path):
|
|
drive = self.drive_for_device(self.device(device_node_path))
|
|
drive.Eject({'auth.no_user_interaction':True},
|
|
dbus_interface=self.DRIVE)
|
|
|
|
|
|
def get_udisks(ver=None):
|
|
if ver is None:
|
|
try:
|
|
u = UDisks2()
|
|
except NoUDisks2:
|
|
u = UDisks()
|
|
return u
|
|
return UDisks2() if ver == 2 else UDisks()
|
|
|
|
|
|
def get_udisks1():
|
|
u = None
|
|
try:
|
|
u = UDisks()
|
|
except NoUDisks1:
|
|
try:
|
|
u = UDisks2()
|
|
except NoUDisks2:
|
|
pass
|
|
if u is None:
|
|
raise EnvironmentError('UDisks not available on your system')
|
|
return u
|
|
|
|
|
|
def mount(node_path):
|
|
u = get_udisks1()
|
|
u.mount(node_path)
|
|
|
|
|
|
def eject(node_path):
|
|
u = get_udisks1()
|
|
u.eject(node_path)
|
|
|
|
|
|
def umount(node_path):
|
|
u = get_udisks1()
|
|
u.unmount(node_path)
|
|
|
|
|
|
def test_udisks(ver=None):
|
|
import sys
|
|
dev = sys.argv[1]
|
|
print('Testing with node', dev)
|
|
u = get_udisks(ver=ver)
|
|
print('Using Udisks:', u.__class__.__name__)
|
|
print('Mounted at:', u.mount(dev))
|
|
print('Unmounting')
|
|
u.unmount(dev)
|
|
print('Mounting')
|
|
u.mount(dev)
|
|
print('Ejecting:')
|
|
u.eject(dev)
|
|
|
|
if __name__ == '__main__':
|
|
print('Run test here...')
|
|
# test_udisks()
|