from typing import Optional
from kivy.event import EventDispatcher
from kivy.logger import Logger
from able import WriteType
from able.queue import BLEQueue, ble_task, ble_task_done
from able.utils import force_convertible_to_java_array
class BLEError(object):
"""raise Exception on attribute access
"""
def __init__(self, msg):
self.msg = msg
def __getattr__(self, name):
raise Exception(self.msg)
class BluetoothDispatcherBase(EventDispatcher):
__events__ = (
'on_device', 'on_scan_started', 'on_scan_completed', 'on_services',
'on_connection_state_change', 'on_characteristic_changed',
'on_characteristic_read', 'on_characteristic_write',
'on_descriptor_read', 'on_descriptor_write',
'on_gatt_release', 'on_error', 'on_rssi_updated', 'on_mtu_changed',
)
queue_class = BLEQueue
def __init__(self, queue_timeout=0.5, enable_ble_code=0xab1e):
super(BluetoothDispatcherBase, self).__init__()
self.queue_timeout = queue_timeout
self.enable_ble_code = enable_ble_code
self._remote_device_address = None
self._set_ble_interface()
self._set_queue()
def _set_ble_interface(self):
self._ble = BLEError('BLE is not implemented for platform')
def _set_queue(self):
self.queue = self.queue_class(timeout=self.queue_timeout)
def _check_runtime_permissions(self):
return True
def _request_runtime_permissions(self):
pass
@property
def gatt(self):
"""GATT profile of the connected device
:type: BluetoothGatt Java object
"""
return self._ble.getGatt()
@property
def bonded_devices(self):
"""List of Java `android.bluetooth.BluetoothDevice` objects of paired BLE devices.
:type: List[BluetoothDevice]
"""
return []
def set_queue_timeout(self, timeout):
"""Change the BLE operations queue timeout
"""
self.queue_timeout = timeout
self.queue.set_timeout(timeout)
def start_scan(self):
"""Start a scan for devices.
Ask for runtime permission to access location.
Start a system activity that allows the user to turn on Bluetooth,
if Bluetooth is not enabled.
The status of the scan start are reported with
:func:`scan_started <on_scan_started>` event.
"""
self._remote_device_address = None
if self._check_runtime_permissions():
self._ble.startScan(self.enable_ble_code)
else:
self._request_runtime_permissions()
def stop_scan(self):
"""Stop the ongoing scan for devices.
"""
self._ble.stopScan()
def connect_by_device_address(self, address: str):
"""Connect to GATT Server of the device with a given Bluetooth hardware address, without scanning.
Start a system activity that allows the user to turn on Bluetooth if Bluetooth is not enabled.
:param address: Bluetooth hardware address string in "XX:XX:XX:XX:XX:XX" format
:raises:
ValueError: if `address` is not a valid Bluetooth address
"""
pass
def connect_gatt(self, device):
"""Connect to GATT Server hosted by device
"""
self._ble.connectGatt(device)
def close_gatt(self):
"""Close current GATT client
"""
self._ble.closeGatt()
def discover_services(self):
"""Discovers services offered by a remote device.
The status of the discovery reported with
:func:`services <on_services>` event.
:return: true, if the remote services discovery has been started
"""
return self.gatt.discoverServices()
def enable_notifications(self, characteristic, enable=True, indication=False):
"""Enable/disable notifications or indications for a given characteristic
:param characteristic: BluetoothGattCharacteristic Java object
:param enable: enable notifications if True, else disable notifications
:param indication: handle indications instead of notifications
:return: True, if the operation was initiated successfully
"""
return True
@ble_task
def write_descriptor(self, descriptor, value):
"""Set and write the value of a given descriptor to the associated
remote device
:param descriptor: BluetoothGattDescriptor Java object
:param value: value to write
"""
if not descriptor.setValue(force_convertible_to_java_array(value)):
Logger.error("Error on set descriptor value")
return
if not self.gatt.writeDescriptor(descriptor):
Logger.error("Error on descriptor write")
@ble_task
def write_characteristic(self, characteristic, value, write_type: Optional[WriteType] = None):
"""Write a given characteristic value to the associated remote device
:param characteristic: BluetoothGattCharacteristic Java object
:param value: value to write
:param write_type: specific write type to set for the characteristic
"""
self._ble.writeCharacteristic(
characteristic,
force_convertible_to_java_array(value),
int(write_type or 0)
)
@ble_task
def read_characteristic(self, characteristic):
"""Read a given characteristic from the associated remote device
:param characteristic: BluetoothGattCharacteristic Java object
"""
self._ble.readCharacteristic(characteristic)
@ble_task
def update_rssi(self):
"""Triggers an update for the RSSI from the associated remote device
"""
self._ble.readRemoteRssi()
@ble_task
def request_mtu(self, mtu: int):
"""Request to change the ATT Maximum Transmission Unit value
:param value: new MTU size
"""
self.gatt.requestMtu(mtu)
def on_error(self, msg):
"""Error handler
:param msg: error message
"""
self._ble = BLEError(msg) # Exception for calls from another threads
raise Exception(msg)
@ble_task_done
def on_gatt_release(self):
"""`gatt_release` event handler.
Event is dispatched at every read/write completed operation
"""
pass
def on_scan_started(self, success):
"""`scan_started` event handler
:param success: true, if scan was started successfully
"""
pass
def on_scan_completed(self):
"""`scan_completed` event handler
"""
pass
def on_device(self, device, rssi, advertisement):
"""`device` event handler.
Event is dispatched when device is found during a scan.
:param device: BluetoothDevice Java object
:param rssi: the RSSI value for the remote device
:param advertisement: :class:`Advertisement` data record
"""
pass
def on_connection_state_change(self, status, state):
"""`connection_state_change` event handler
:param status: status of the operation,
`GATT_SUCCESS` if the operation succeeds
:param state: STATE_CONNECTED or STATE_DISCONNECTED
"""
pass
def on_services(self, services, status):
"""`services` event handler
:param services: :class:`Services` dict filled with discovered
characteristics
(BluetoothGattCharacteristic Java objects)
:param status: status of the operation,
`GATT_SUCCESS` if the operation succeeds
"""
pass
def on_characteristic_changed(self, characteristic):
"""`characteristic_changed` event handler
:param characteristic: BluetoothGattCharacteristic Java object
"""
pass
def on_characteristic_read(self, characteristic, status):
"""`characteristic_read` event handler
:param characteristic: BluetoothGattCharacteristic Java object
:param status: status of the operation,
`GATT_SUCCESS` if the operation succeeds
"""
pass
def on_characteristic_write(self, characteristic, status):
"""`characteristic_write` event handler
:param characteristic: BluetoothGattCharacteristic Java object
:param status: status of the operation,
`GATT_SUCCESS` if the operation succeeds
"""
pass
def on_descriptor_read(self, descriptor, status):
"""`descriptor_read` event handler
:param descriptor: BluetoothGattDescriptor Java object
:param status: status of the operation,
`GATT_SUCCESS` if the operation succeeds
"""
pass
def on_descriptor_write(self, descriptor, status):
"""`descriptor_write` event handler
:param descriptor: BluetoothGattDescriptor Java object
:param status: status of the operation,
`GATT_SUCCESS` if the operation succeeds
"""
pass
def on_rssi_updated(self, rssi, status):
"""`onReadRemoteRssi` event handler.
Event is dispatched at every RSSI update completed operation,
reporting a RSSI value for a remote device connection.
:param rssi: integer containing RSSI value in dBm
:param status: status of the operation,
`GATT_SUCCESS` if the operation succeeds
"""
pass
def on_mtu_changed(self, mtu, status):
"""`onMtuChanged` event handler
Event is dispatched when MTU for a remote device has changed,
reporting a new MTU size.
:param mtu: integer containing the new MTU size
:param status: status of the operation,
`GATT_SUCCESS` if the MTU has been changed successfully
"""
pass