# # Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # These materials are licensed under the Amazon Software License in connection with the Alexa Gadgets Program. # The Agreement is available at https://aws.amazon.com/asl/. # See the Agreement for the specific terms and conditions of the Agreement. # Capitalized terms not defined in this file have the meanings given to them in the Agreement. # import binascii import dbus import dbus.service import logging.config import select import subprocess import threading import uuid from dbus.mainloop.glib import DBusGMainLoop from gi.repository import GObject from agt.base_adapter import BaseAdapter from agt.base_adapter import BUS_NAME, ADAPTER_INTERFACE, DBUS_OM_IFACE, DEVICE_INTERFACE import bluetooth logger = logging.getLogger(__name__) """ Constants """ _STX = 0xF0 # Start transmission _ETX = 0xF1 # End transmission. _ESC = 0xF2 # Escape character to allow reserved characters. _RESERVED = [_STX, _ETX, _ESC] # Indicate no error. _ERR = 0x00 # Hardcoded per documentation. _CMD = 0x02 _SPP_CHANNEL = 4 """ BlueZ constants """ BLUEZ_AGENT_PATH = '/org/bluez/agent' AGENT_INTERFACE = 'org.bluez.Agent1' AGENT_MANAGER_INTERFACE = 'org.bluez.AgentManager1' PROFILE_MANAGER_INTERFACE = 'org.bluez.ProfileManager1' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' IO_CAPABILITY = 'NoInputNoOutput' def _hciconfig(args): return subprocess.run(['/usr/bin/sudo', '/bin/hciconfig'] + args, stdout=subprocess.PIPE) def _sdptool(args): return subprocess.run(['/usr/bin/sudo', '/usr/bin/sdptool'] + args, stdout=subprocess.PIPE) """ BluetoothAdapter bluetooth interface for Alexa Gadgets application """ class BluetoothAdapter(dbus.service.Object): def __init__(self, gadget_friendly_name, gadget_vendor_id, gadget_product_id, spp_data_handler_cb, on_connection_cb, on_disconnection_cb): # initialize BlueZAPI self._bluez_api = _BlueZAPI() """ Create connections. :param _SPP_CHANNEL: port number for RFCOMM :param spp_data_handler_cb: Callback for raw spp data. :param on_connection_cb: Callback when connection is up :param on_disconnection_cb: Callback when connection is down """ # initialize RFCOMM server self._spp_server = _RFCOMMServer(_SPP_CHANNEL, spp_data_handler_cb, on_connection_cb, on_disconnection_cb) self._gadget_friendly_name = gadget_friendly_name self._gadget_vendor_id = gadget_vendor_id self._gadget_product_id = gadget_product_id @staticmethod def get_address(): """ Gets the BD Address of the host :return: Host BD Address """ p = _hciconfig(['hci0']) bdaddr = p.stdout.decode('utf-8').split('BD Address: ')[1].split(' ')[0] bdaddr = bdaddr.replace(':', '').strip() return bdaddr def start_server(self): """ Start the connection, registering as a gadget. """ _create_service_records() self._spp_server.start() def stop_server(self): """ Stop the server """ self.disconnect() self._bluez_api.stop_dbus() def poll_server(self): """ Check the servers for connection/data. """ self._spp_server.poll() def send(self, data): """ Send data to a server. :param data: """ # Generate SPP packet before sending over SPP server packet = _SPPPacket() packet.payload = data self._spp_server.send(packet.get()) def set_discoverable(self, discoverable): """ Turn on/off discoverability. :param discoverable: On/Off for discoverable. """ if discoverable: self._bluez_api.start_inbound_pairing_mode(self._gadget_friendly_name, self._create_eir()) else: self._bluez_api.stop_inbound_pairing_mode() def disconnect(self): self._spp_server.disconnect() def is_connected(self): """ Report whether connection is active. :return: Boolean whether connection is active. """ return self._spp_server.is_connected() def get_connection_info(self): return self._spp_server.get_connection_info() def reconnect(self, bdaddr): """ Reconnect to a bdaddr specified by bdaddr. :param bdaddr: Address to reconnect to. """ _sdptool(['search', '--bdaddr', bdaddr, '0x1101']) def is_paired_to_address(self, bd_addr): return self._bluez_api.is_paired_to_address(bd_addr) def unpair(self, bd_addr): self._bluez_api.unpair(bd_addr) def run(self): self._bluez_api.run_dbus() def _create_eir(self): """ Create extended inquiry response. """ # Length, address, and actual name eir = '{0:0{1}x}'.format(1 + len(self._gadget_friendly_name), 2) eir += '09' eir += binascii.hexlify(self._gadget_friendly_name.encode('utf8')).decode('utf-8') # Length, address, and actual UUID eir += '11' # Address for UUID data, eir += '06' eir += 'B7166825D15A949FED4E3A98B3D28860' # Length, address, and actual manufacturer data. eir += '0b' eir += 'ff' # Vendor ID, Product ID, Amazon SIG VID, Amazon Gadget UUID eir += self._gadget_vendor_id eir += self._gadget_product_id eir += '7101' eir += '101515fe' # End of data eir += '00' return eir class _RFCOMMServer: """ RFCOMM server using pybluez """ def __init__(self, channel, data_handler_cb, on_connected_cb, on_disconnected_cb): """ Initializer a single server. :param channel: Channel to connect. :param data_handler_cb: Data sink. :param on_connected_cb: Connection success. :param on_disconnected_cb: Disconnection. """ self._data_handler_cb = data_handler_cb self._on_connected_cb = on_connected_cb self._on_disconnected_cb = on_disconnected_cb self._channel = channel self._server = bluetooth.BluetoothSocket(bluetooth.RFCOMM) self._socket = None self._info = None self._send_queue = bytearray() self._send_queue_available = threading.Condition() self._spp_parser = _Parser(self._data_handler_cb) def start(self): """ Start the server. """ self._server.bind(('', self._channel)) self._server.listen(1) def send(self, data): """ Send data, it is thread safe although there isn't any multi-threading. :param data: Data to append, None will clear queue. """ self._send_queue_available.acquire() if data is None: self._send_queue = bytearray() elif self.is_connected(): self._send_queue += data self._send_queue_available.release() def poll(self): """ Check the server for a connection or data. """ if self.is_connected(): self._poll_read() self._poll_write() else: self._poll_connect() def is_connected(self): """ Report whether server is connected. :return: True/False if server is connected. """ return self._socket is not None def disconnect(self): """ Close and disconnect socket. """ if self._socket: prev_info = self._info self._socket.close() self._socket = None self._info = None self._on_disconnected_cb(prev_info[0]) def get_connection_info(self): """ Get information on the current connection. :return: A connection tuple of (address, channel) or (None, None) if not connected. """ if self.is_connected(): return self._info else: return None, None def _poll_connect(self): """ Poll the server to see if a connection is accepted. """ available, _, _ = select.select([self._server], [], [], 0) if len(available): self._connect() def _connect(self): self._socket, self._info = self._server.accept() self.send(None) self._on_connected_cb(self._info[0]) def _poll_read(self): """ Poll a connection to see if data is available or needs to be sent. """ if self._socket is not None: readable, _, _ = select.select([self._socket], [], [], 0) if len(readable): self._read() def _read(self): data = bytes() try: data = self._socket.recv(1024) except bluetooth.btcommon.BluetoothError as e: # Indicates a broken connection. logger.debug('Bluetooth connection broken: {}'.format(e)) finally: if len(data): self._spp_parser.parse(data) else: self.disconnect() def _poll_write(self): if self._socket is not None: _, writable, _ = select.select([], [self._socket], [], 0) if len(writable): self._write() def _write(self): count = 0 try: self._send_queue_available.acquire() if len(self._send_queue): count = self._socket.send(bytes(self._send_queue)) except bluetooth.btcommon.BluetoothError as e: # Indicates a broken connection. logger.debug('Bluetooth connection broken: {}'.format(e)) self.disconnect() finally: self._send_queue = self._send_queue[count:] self._send_queue_available.release() class _Parser: """ The Parser class will handle the standard SPP packet. """ def __init__(self, payload_cb): """ Initialize :param payload_cb: Callback to send a completed payload. """ self._payload_cb = payload_cb self._data = bytearray() self._state = self._state_find_stx self._packet = None def parse(self, data): """ Parse incoming data, will call the payload cb when a packet is found. :param data: New data. """ self._data += bytearray(data) while len(self._data): self._state(self._data.pop(0)) def _start_packet(self): """ Begin a new packet. """ self._packet = _SPPPacket() self._state = self._state_get_command_id def _state_find_stx(self, c): """ Find start transmission (STX) character. Followed by state_get_command_id() :param c: Char to parse """ if c == _STX: self._start_packet() def _state_get_command_id(self, c): """ Accept command id character. Followed by state_get_error_id(). Will drop back to start if STX is found. :param c: Char to parse """ if c == _STX: self._start_packet() else: self._packet.command_id = c self._state = self._state_get_error_id def _state_get_error_id(self, c): """ Accept error id character. Followed by state_get_seq_id(). Will drop back to start if STX is found. :param c: Char to parse """ if c == _STX: self._start_packet() else: self._packet.error_id = c self._state = self._state_get_seq_id def _state_get_seq_id(self, c): """ Accept sequence id character. Followed by state_get_data(). Will drop back to start if STX is found. :param c: Char to parse """ if c == _STX: self._start_packet() elif c == _ESC: self._state = self._state_get_seq_id_escaped else: self._packet.sequence_id = c self._state = self._state_get_data def _state_get_seq_id_escaped(self, c): """ Accept a single escaped character (seq_id can also be escaped) :param c: Char to parse """ self._packet.sequence_id = _ESC ^ c self._state = self._state_get_data def _state_get_data(self, c): """ Accept payload. This could be an escaped character (state_get_escaped() Will drop back to start if STX is found, and validate checksum and send payload if ETX is found. :param c: Char to parse """ if c == _STX: self._start_packet() elif c == _ESC: self._state = self._state_get_escaped elif c == _ETX: # Last two bytes are checksum if len(self._packet.payload) >= 2: found_checksum = self._packet.payload.pop() + (self._packet.payload.pop() << 8) # Then get actual calc_checksum = self._packet._calc_checksum() if found_checksum == calc_checksum: self._payload_cb(self._packet.payload) self._state = self._state_find_stx else: self._packet.payload.append(c) def _state_get_escaped(self, c): """ Accept a single escaped character. :param c: Char to parse """ self._packet.payload.append(_ESC ^ c) self._state = self._state_get_data def _create_service_records(): """ Create BT service records. """ _bus = dbus.SystemBus().get_object(BUS_NAME, '/org/bluez') _manager = dbus.Interface(_bus, PROFILE_MANAGER_INTERFACE) _opts = { 'Role': 'server', 'RequireAuthentication': False, 'RequireAuthorization': False } def _create_record(path, xml_record): """ Register a single BT service. """ _opts['ServiceRecord'] = xml_record _manager.RegisterProfile(path, str(uuid.uuid4()), _opts) # Primary gadget record _create_record('/bluez5', """ """) # This is for the two SPP channels def _create_channel_xml(uuid, channel): """ Create XML string for a single record. :return: XML record """ return """ """ % (uuid, channel, uuid) # Create primary service record _create_record('/bluez3', _create_channel_xml('0x1201', _SPP_CHANNEL)) class _SPPPacket: _SEQ_ID = 0 def __init__(self): """ Create SPP packet. """ self.payload = bytearray() self.command_id = None self.error_id = None def get(self): """ Create a full packet from payload. """ header = self._get_header() self.command_id = _CMD self.error_id = _ERR checksum = self._calc_checksum() payload_to_escape = self.payload + bytearray([checksum >> 8, checksum & 0xFF]) payload_escaped = bytearray() for b in payload_to_escape: if b in _RESERVED: payload_escaped.append(_ESC) payload_escaped.append(_ESC ^ b) else: payload_escaped.append(b) payload_escaped.append(_ETX) return header + payload_escaped def _calc_header_checksum(self): return self.command_id + self.error_id def _get_header(self): return bytearray([_STX, _CMD, _ERR, self._get_sequence_id()]) def _calc_checksum(self): """ Calculate the payload checksum. """ payload_sum = sum(self.payload) checksum = payload_sum + self._calc_header_checksum() return checksum & 0xFFFF @staticmethod def _get_sequence_id(): """ Create a sequence id. :return: Sequence id. """ retval = _SPPPacket._SEQ_ID while True: _SPPPacket._SEQ_ID += 1 _SPPPacket._SEQ_ID &= 0xFF if _SPPPacket._SEQ_ID not in _RESERVED: break return retval class _BlueZAPI(dbus.service.Object, BaseAdapter): """ A python wrapper for BlueZ dbus APIs """ def __init__(self): # sspmode (Simple Secure Pairing Mode) should always be 1. # 0 indicates the legacy pairing using pin code. _hciconfig(['hci0', 'sspmode', '1']) # initialize Mainloop DBusGMainLoop(set_as_default=True) # initialize agent interface self._loop = GObject.MainLoop() self._bus = dbus.SystemBus() dbus.service.Object.__init__(self, self._bus, BLUEZ_AGENT_PATH) BaseAdapter.__init__(self, self._bus, dbus) # initialize bluez properties self._bluez_properties = dbus.Interface(self._bus.get_object(BUS_NAME, self.bluez_adapter.object_path), DBUS_PROP_IFACE) # initialize bluez agent manager self._bluez_agent_manager = dbus.Interface(self._bus.get_object(BUS_NAME, '/org/bluez'), AGENT_MANAGER_INTERFACE) @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def AuthorizeService(self, device, uuid): logger.debug("Authroize Service (%s, %s) of peer device" % (device, uuid)) self._trustDevice(device) @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def DisplayPinCode(self, device, pincode): logger.error("Error: wrong IO Capability used: DisplayPinCode: (%s, %s)" % (device, pincode)) @dbus.service.method(AGENT_INTERFACE, in_signature="ouq", out_signature="") def DisplayPasskey(self, device, passkey, entered): logger.error("Error: wrong IO Capability used: DisplayPasskey: (%s, %06u entered %u)" % (device, passkey, entered)) @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s") def RequestPinCode(self, device): logger.error("Error: wrong IO Capability used: RequestPinCode: (%s)" % (device)) return "" @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="u") def RequestPasskey(self, device): logger.error("Error: wrong IO Capability used: RequestPasskey returns 0") return dbus.UInt32(0) @dbus.service.method(AGENT_INTERFACE, in_signature="ou", out_signature="") def RequestConfirmation(self, device, passkey): # Always confirm without asking logger.debug("RequestConfirmation (%s)" % (device)) self._trustDevice(device) @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="") def RequestAuthorization(self, device): # Always authorize without asking logger.debug("RequestAuthorization (%s)" % (device)) @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Cancel(self): logger.info("Canel Pairing") def run_dbus(self): self._loop.run() def stop_dbus(self): self._loop.quit() def _trustDevice(self, path): self._bluez_properties.Set(DEVICE_INTERFACE, "Trusted", True) def start_inbound_pairing_mode(self, friendly_name, eir): ''' Start inbound pairing mode using friendly name and EIR. Inbound means this device is passively being paired by another device. ''' # hci commands to configure EIR _hciconfig(['hci0', 'reset']) _hciconfig(['hci0', 'name', friendly_name]) # mode 2 means inq with EIR _hciconfig(['hci0', 'inqmode', '2']) _hciconfig(['hci0', 'inqdata', eir]) # piscan means both page scan and inquire scan _hciconfig(['hci0', 'piscan']) # btm commands to configure pairing mode self._bluez_agent_manager.RegisterAgent(BLUEZ_AGENT_PATH, IO_CAPABILITY) self._bluez_agent_manager.RequestDefaultAgent(BLUEZ_AGENT_PATH) self._bluez_properties.Set(ADAPTER_INTERFACE, 'Pairable', True) self._bluez_properties.Set(ADAPTER_INTERFACE, 'Discoverable', True) def stop_inbound_pairing_mode(self): self._bluez_properties.Set(ADAPTER_INTERFACE, 'Discoverable', False) self._bluez_properties.Set(ADAPTER_INTERFACE, 'Pairable', False) _hciconfig(['hci0', 'noscan']) def is_paired_to_address(self, bd_addr): return super(_BlueZAPI, self).is_paired_to_address(bd_addr) def unpair(self, bd_addr): super(_BlueZAPI, self).unpair(bd_addr)