home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) __license__ = 'GPL v3' __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>' from ctypes import cdll, POINTER, byref, pointer, Structure as _Structure, c_ubyte, c_ushort, c_int, c_char, c_void_p, c_byte, c_uint from errno import EBUSY, ENOMEM from calibre import iswindows, isosx, isfreebsd, load_library _libusb_name = 'libusb' if iswindows: pass elif isosx or isfreebsd: pass PATH_MAX = 4096 if iswindows: class Structure(_Structure): _pack_ = 1 _libusb_name = 'libusb0' else: Structure = _Structure try: try: _libusb = load_library(_libusb_name, cdll) except OSError: _libusb = cdll.LoadLibrary('libusb-0.1.so.4') has_library = True except: _libusb = None has_library = False class DeviceDescriptor(Structure): _fields_ = [ ('Length', c_ubyte), ('DescriptorType', c_ubyte), ('bcdUSB', c_ushort), ('DeviceClass', c_ubyte), ('DeviceSubClass', c_ubyte), ('DeviceProtocol', c_ubyte), ('MaxPacketSize0', c_ubyte), ('idVendor', c_ushort), ('idProduct', c_ushort), ('bcdDevice', c_ushort), ('Manufacturer', c_ubyte), ('Product', c_ubyte), ('SerialNumber', c_ubyte), ('NumConfigurations', c_ubyte)] class EndpointDescriptor(Structure): _fields_ = [ ('Length', c_ubyte), ('DescriptorType', c_ubyte), ('EndpointAddress', c_ubyte), ('Attributes', c_ubyte), ('MaxPacketSize', c_ushort), ('Interval', c_ubyte), ('Refresh', c_ubyte), ('SynchAddress', c_ubyte), ('extra', POINTER(c_char)), ('extralen', c_int)] class InterfaceDescriptor(Structure): _fields_ = [ ('Length', c_ubyte), ('DescriptorType', c_ubyte), ('InterfaceNumber', c_ubyte), ('AlternateSetting', c_ubyte), ('NumEndpoints', c_ubyte), ('InterfaceClass', c_ubyte), ('InterfaceSubClass', c_ubyte), ('InterfaceProtocol', c_ubyte), ('Interface', c_ubyte), ('endpoint', POINTER(EndpointDescriptor)), ('extra', POINTER(c_char)), ('extralen', c_int)] class Interface(Structure): _fields_ = [ ('altsetting', POINTER(InterfaceDescriptor)), ('num_altsetting', c_int)] class ConfigDescriptor(Structure): _fields_ = [ ('Length', c_ubyte), ('DescriptorType', c_ubyte), ('TotalLength', c_ushort), ('NumInterfaces', c_ubyte), ('Value', c_ubyte), ('Configuration', c_ubyte), ('Attributes', c_ubyte), ('MaxPower', c_ubyte), ('interface', POINTER(Interface)), ('extra', POINTER(c_ubyte)), ('extralen', c_int)] def __str__(self): ans = '' for field in self._fields_: ans += field[0] + ': ' + str(eval('self.' + field[0])) + '\n' return ans.strip() class Error(Exception): pass class Device(Structure): def open(self): handle = _libusb.usb_open(byref(self)) if not handle: raise Error('Cannot open device') handle return handle.contents def configurations(self): doc = ' List of device configurations. See L{ConfigDescriptor} ' def fget(self): ans = [] for config in range(self.device_descriptor.NumConfigurations): ans.append(self.config_descriptor[config]) return tuple(ans) return property(doc = doc, fget = fget) configurations = dynamic_property(configurations) class Bus(Structure): def device_list(self): doc = '\n Flat list of devices on this bus.\n Note: children are not explored\n TODO: Check if exploring children is neccessary (e.g. with an external hub)\n ' def fget(self): if _libusb is None: return [] if _libusb.usb_find_devices() < 0: raise Error('Unable to search for USB devices') _libusb.usb_find_devices() < 0 ndev = self.devices ans = [] while ndev: dev = ndev.contents ans.append(dev) ndev = dev.next continue _libusb is None return ans return property(doc = doc, fget = fget) device_list = dynamic_property(device_list) class DeviceHandle(Structure): _fields_ = [ ('fd', c_int), ('bus', POINTER(Bus)), ('device', POINTER(Device)), ('config', c_int), ('interface', c_int), ('altsetting', c_int), ('impl_info', c_void_p)] def close(self): _libusb.usb_close(byref(self)) def set_configuration(self, config): try: num = config.Value except AttributeError: num = config ret = _libusb.usb_set_configuration(byref(self), num) if ret < 0: raise Error('Failed to set device configuration to: ' + str(num) + '. Error code: ' + str(ret)) ret < 0 def claim_interface(self, num): ret = _libusb.usb_claim_interface(byref(self), num) if -ret == ENOMEM: raise Error('Insufficient memory to claim interface') -ret == ENOMEM if -ret == EBUSY: raise Error('Device busy') -ret == EBUSY if ret < 0: raise Error('Unknown error occurred while trying to claim USB interface: ' + str(ret)) ret < 0 def control_msg(self, rtype, request, bytes, value = 0, index = 0, timeout = 100): size = 0 try: size = len(bytes) except TypeError: size = bytes ArrayType = c_byte * size _libusb.usb_control_msg.argtypes = [ POINTER(DeviceHandle), c_int, c_int, c_int, c_int, POINTER(ArrayType), c_int, c_int] arr = ArrayType() rsize = _libusb.usb_control_msg(byref(self), rtype, request, value, index, byref(arr), size, timeout) if rsize < size: raise Error('Could not read ' + str(size) + ' bytes on the control bus. Read: ' + str(rsize) + ' bytes.') rsize < size return arr ArrayType = c_byte * size _libusb.usb_control_msg.argtypes = [ POINTER(DeviceHandle), c_int, c_int, c_int, c_int, POINTER(ArrayType), c_int, c_int] arr = ArrayType(*bytes) return _libusb.usb_control_msg(byref(self), rtype, request, value, index, byref(arr), size, timeout) def bulk_read(self, endpoint, size, timeout = 100): ArrayType = c_byte * size arr = ArrayType() _libusb.usb_bulk_read.argtypes = [ POINTER(DeviceHandle), c_int, POINTER(ArrayType), c_int, c_int] rsize = _libusb.usb_bulk_read(byref(self), endpoint, byref(arr), size, timeout) if rsize < 0: raise Error('Could not read ' + str(size) + ' bytes on the bulk bus. Error code: ' + str(rsize)) rsize < 0 if rsize == 0: raise Error('Device sent zero bytes') rsize == 0 if rsize < size: arr = arr[:rsize] return arr def bulk_write(self, endpoint, bytes, timeout = 100): size = len(bytes) ArrayType = c_byte * size arr = ArrayType(*bytes) _libusb.usb_bulk_write.argtypes = [ POINTER(DeviceHandle), c_int, POINTER(ArrayType), c_int, c_int] _libusb.usb_bulk_write(byref(self), endpoint, byref(arr), size, timeout) def release_interface(self, num): ret = _libusb.usb_release_interface(pointer(self), num) if ret < 0: raise Error('Unknown error occurred while trying to release USB interface: ' + str(ret)) ret < 0 def reset(self): ret = _libusb.usb_reset(pointer(self)) if ret < 0: raise Error('Unknown error occurred while trying to reset USB device ' + str(ret)) ret < 0 Bus._fields_ = [ ('next', POINTER(Bus)), ('previous', POINTER(Bus)), ('dirname', c_char * (PATH_MAX + 1)), ('devices', POINTER(Device)), ('location', c_uint), ('root_dev', POINTER(Device))] Device._fields_ = [ ('next', POINTER(Device)), ('previous', POINTER(Device)), ('filename', c_char * (PATH_MAX + 1)), ('bus', POINTER(Bus)), ('device_descriptor', DeviceDescriptor), ('config_descriptor', POINTER(ConfigDescriptor)), ('dev', c_void_p), ('devnum', c_ubyte), ('num_children', c_ubyte), ('children', POINTER(POINTER(Device)))] if _libusb is not None: try: _libusb.usb_get_busses.restype = POINTER(Bus) _libusb.usb_open.restype = POINTER(DeviceHandle) _libusb.usb_open.argtypes = [ POINTER(Device)] _libusb.usb_close.argtypes = [ POINTER(DeviceHandle)] _libusb.usb_claim_interface.argtypes = [ POINTER(DeviceHandle), c_int] _libusb.usb_claim_interface.restype = c_int _libusb.usb_release_interface.argtypes = [ POINTER(DeviceHandle), c_int] _libusb.usb_release_interface.restype = c_int _libusb.usb_reset.argtypes = [ POINTER(DeviceHandle)] _libusb.usb_reset.restype = c_int _libusb.usb_control_msg.restype = c_int _libusb.usb_bulk_read.restype = c_int _libusb.usb_bulk_write.restype = c_int _libusb.usb_set_configuration.argtypes = [ POINTER(DeviceHandle), c_int] _libusb.usb_set_configuration.restype = c_int _libusb.usb_init() _libusb = None def busses(): if _libusb is None: raise Error('Could not find libusb.') _libusb is None if _libusb.usb_find_busses() < 0: raise Error('Unable to search for USB busses') _libusb.usb_find_busses() < 0 if _libusb.usb_find_devices() < 0: raise Error('Unable to search for USB devices') _libusb.usb_find_devices() < 0 ans = [] nbus = _libusb.usb_get_busses() while nbus: bus = nbus.contents ans.append(bus) nbus = bus.next return ans def get_device_by_id(idVendor, idProduct): buslist = busses() for bus in buslist: devices = bus.device_list for dev in devices: if dev.device_descriptor.idVendor == idVendor and dev.device_descriptor.idProduct == idProduct: return dev def has_library(): return _libusb is not None def get_devices(): buslist = busses() ans = [] for bus in buslist: devices = bus.device_list for dev in devices: device = (dev.device_descriptor.idVendor, dev.device_descriptor.idProduct, dev.device_descriptor.bcdDevice) ans.append(device) return ans