Intro
Messy code can derail a project. It can lead to spending an excessive amount of time on things like troubleshooting and maintenance. Writing clean and reusable code allows programmers to keep moving the project forward as quickly as possible.
This post is the second installment of a series of posts I'm putting together about Python and Bleak. If you're a little lost, you might want to check out the first post here.
In this blog, I'm going to focus on keeping your code clean by creating a Python class for your device. Here's a quick introduction to some of the terms.
- Bleak - A Python library for Bluetooth devices. Python does not have any built in ability to send Bluetooth messages. In order to communicate with a Bluetooth device using Python, you need to import a library. Bleak uses your computer's internal hardware to communicate with Bluetooth devices. You do not need to setup your computer with any special configuration settings. Python takes care of most of the hard stuff for us.
- Library - Python libraries are collections of pre-written code and functions that extend the capabilities of the Python programming language.
- Class - In Python, a class is a separate set of code that's meant to be reused. Often, a class is kept in a separate file and imported into the main program.
- Device - This is the gadget that's sitting on your desk. It could be a pair of headphones, a devkit, or a full blown prototype. We want to use Python and Bleak to communicate with this device.
Classes are a key tool for keeping code readable. By creating a separate class for our device, the main program can keep some of the messy details hidden. In the end, you'll be able to read your program like a book. Here's what the final code will look like.
import asyncio
import deviceble
async def main():
device = deviceble.DeviceBle()
try:
await device.connect()
battery_level = await device.read_battery_level()
await device.disconnect()
except Exception as e:
print(e)
asyncio.run(main())
Before we start: Wavecake
If you're here to automate a Bluetooth process using Python, now's the time to point out that you could be using Wavecake instead. I built Wavecake to make it easier to share Bluetooth automations amongst teams. It's in a free Beta release. I'm actively seeking feedback. Please feel free to give it a shot and pass along your two cents!
Combine Discovery and Connection
I think one of the better ways to hide the complexity of using Bleak for a device class is to combine the discovery and the connection steps into a single connect call. The discovery step finds the device. The connection step initiates the connection. With Bluetooth, you always need to do both, so why not combine them?
Side note: When using Bleak, there's a temptation to incorporate the device mac address into the user flow. However, it's not a best practice. Most Bluetooth APIs (Apple's CoreBluetooth in particular) do not let you access the mac address. It's best to get used to discovering the device so that your process will scale.
import asyncio
from bleak import BleakScanner, BleakClient
class DeviceBle():
def __init__(self):
self.client = None
self.uuid_battery_service = '0000180f-0000-1000-8000-00805f9b34fb'
self.uuid_battery_level_characteristic = '00002a19-0000-1000-8000-00805f9b34fb'
async def discover(self):
devices = await BleakScanner.discover(5.0, return_adv=True)
for device in devices:
advertisement_data = devices[device][1]
if(advertisement_data.local_name == "DeviceName"):
if(advertisement_data.rssi > -90):
self.device = devices[device]
return device
async def connect(self):
address = await self.discover()
if address is not None:
try:
print("Found device at address: %s" % (address))
print("Attempting to connect...")
self.client = BleakClient(address)
await self.client.connect()
print("Connected")
except:
raise Exception("Failed to connect")
else:
raise Exception("Did not find available devices")
async def disconnect(self):
try:
print("Disconnecting...")
await self.client.disconnect()
print("Disconnected!")
except:
raise Exception("Warning: Failed to disconnect. Check for hanging connection")
I went ahead and threw in a function for disconnect as well so that we can fully test this code after putting it together.
This code uses a pretty simple discovery routine. First, it looks for devices for 5 seconds. It then searches through the list for devices that match the specified name. If you're like me, you might have a lot of devices with the same name lying around your workbench. To prevent any confusion, I added an RSSI requirement. If the device is close, with an RSSI of greater than -90, then the discovery routine will return a result. The connection routine then uses the address to make a connection.
Bleak supports several other ways to discover devices. Name tends to be the most straightforward to work with in the beginning. For instance, if your device is named 'Tempsense10', you can use the discover function to look for all devices named 'Tempsense10'. To better future-proof your design, you might want to start incorporating discovery based on the device UUID or the device's manufacturing data.
Now, in your original file, your code calling into the class should look like this.
import asyncio
import deviceble
async def main():
device = deviceble.DeviceBle()
try:
await device.connect()
await device.disconnect()
except Exception as e:
print(e)
asyncio.run(main())
It's so clean. The possibilities are now endless! Let's continue.
Reading Characterisitics
Now that we can connect and disconnect from the device, it's time to expand on what our class can provide. We'll begin by writing a generic function to read characteristics. Then, we'll write a device specific function for reading the battery level.
To start, I'll create the definition for a read characteristic function.
async def read_characteristic(self,uuid):
try:
await self.client.read_gatt_char(uuid)
except:
raise Exception("Failed to read characteristic.")
This function is pretty straightforward. I like to have it isolated rather than combined, even though it's just one line of code. This allows you to raise the exception from a single place. That makes the higher level functions easier to write.
Now, I need to create a high-level function that will perform a device specific interaction. For this tutorial, I'll use the example of getting the battery life. First, add the UUID definition to the init function.
def __init__(self):
self.client = None
self.uuid_battery_service = '0000180f-0000-1000-8000-00805f9b34fb'
self.uuid_battery_level_characteristic = '00002a19-0000-1000-8000-00805f9b34fb'
Next, use our read characteristic function. This is a great place to hide any formatting of your bytes. In this case, we convert the byte array to an integer. This let's us report the battery level back as an even number.
async def read_battery_level(self):
battery_level = await self.read_characteristic(self.uuid_battery_level_characteristic)
return int.from_bytes(battery_level)
All together, it should look like this.
import asyncio
from bleak import BleakScanner, BleakClient
class DeviceBle():
def __init__(self):
self.client = None
self.uuid_battery_service = '0000180f-0000-1000-8000-00805f9b34fb'
self.uuid_battery_level_characteristic = '00002a19-0000-1000-8000-00805f9b34fb'
async def discover(self):
devices = await BleakScanner.discover(5.0, return_adv=True)
for device in devices:
advertisement_data = devices[device][1]
if(advertisement_data.local_name == "Wa"):
if(advertisement_data.rssi > -90):
self.device = devices[device]
return device
async def connect(self):
address = await self.discover()
if address is not None:
try:
print("Found device at address: %s" % (address))
print("Attempting to connect...")
self.client = BleakClient(address)
await self.client.connect()
print("Connected")
except:
raise Exception("Failed to connect")
else:
raise Exception("Did not find available devices")
async def disconnect(self):
try:
print("Disconnecting...")
await self.client.disconnect()
print("Disconnected!")
except:
raise Exception("Warning: Failed to disconnect. Check for hanging connection")
async def read_characteristic(self,uuid):
try:
return await self.client.read_gatt_char(uuid)
except:
raise Exception("Failed to read characteristic.")
async def read_battery_level(self):
battery_level = await self.read_characteristic(self.uuid_battery_level_characteristic)
return int.from_bytes(battery_level)
Now we can add this function to our routine.
import asyncio
import deviceble
async def main():
device = deviceble.DeviceBle()
try:
await device.connect()
battery_level = await device.read_battery_level()
await device.disconnect()
except Exception as e:
print(e)
asyncio.run(main())
Conclusion
In this blog, we have done the basics of outlining a Bluetooth class for a custom device. Creating a device class takes a lot of the headaches out of scaling the code in a variety of different ways. We didn't touch on any direct examples, but I've used this method for creating test routines in addition to incorporating it into a UI. Hopefully your code can look a little cleaner, and your workflow can get a little more efficient.
If you've made it to the end of this tutorial and find the prospect a little bit daunting, you should check out Wavecake. The tool includes a library and environment. It can help you share your work with your team and break down friction involved in automation for embedded devices.
I've included the complete example code for the device class below.
import asyncio
from bleak import BleakScanner, BleakClient
class DeviceBle():
def __init__(self):
self.client = None
self.uuid_battery_service = '0000180f-0000-1000-8000-00805f9b34fb'
self.uuid_battery_level_characteristic = '00002a19-0000-1000-8000-00805f9b34fb'
async def discover(self):
devices = await BleakScanner.discover(5.0, return_adv=True)
for device in devices:
advertisement_data = devices[device][1]
if(advertisement_data.local_name == "Wa"):
if(advertisement_data.rssi > -90):
self.device = devices[device]
return device
async def connect(self):
address = await self.discover()
if address is not None:
try:
print("Found device at address: %s" % (address))
print("Attempting to connect...")
self.client = BleakClient(address)
await self.client.connect()
print("Connected")
except:
raise Exception("Failed to connect")
else:
raise Exception("Did not find available devices")
async def disconnect(self):
try:
print("Disconnecting...")
await self.client.disconnect()
print("Disconnected!")
except:
raise Exception("Warning: Failed to disconnect. Check for hanging connection")
async def read_characteristic(self,uuid):
try:
return await self.client.read_gatt_char(uuid)
except:
raise Exception("Failed to read characteristic.")
async def read_battery_level(self):
battery_level = await self.read_characteristic(self.uuid_battery_level_characteristic)
return int.from_bytes(battery_level)