Газоанализатор Online. Асинхронность.
Что-то пошло не так.
Для явной обработки ошибок ввода/вывода и таймаутов реализовать взаимодействие с газовыми датчиками я планировал при помощи state-машин. Машина состояний для перевода датчика в режим запрос-ответ и чтение данных была разработана такая:
Но на Python библиотеки state-машин оказались какими-то совсем беззубыми. Они и близко ни поддерживают то, что в заложено в машины состояний UML и реализовано в Boost.SML/Boost.MSM из мира C++. Не получалось реализовать механизм, когда в state-машину просто передаешь байты, она сама выполняет необходимые действия, после обработки очередного байта приостанавливается (сохраняя состояние), а в это время процесс занимается чем-то еще (кооперативная многозадачность, нужна для работы с несколькими датчиками внутри одного процесса).
Но у Python есть механизм async/await, на котором можно реализовать подобный функционал. Нахождение в состоянии - await (states в терминологии UML), выполнение действий - вызов функций (actions), ветвления - условия перехода (guards), а переход от одного await к другому - переключение между состояниями (transitions). По сути получается та же state-машина, только вывернутая наизнанку в код и без выделенной таблицы переходов. Еще нужна возможность асинхронно (await) получать по одному байту из последовательного порта, но существующий модуль pyserial-asyncio такой возможности из коробки не предоставляет.
Проверка реализуемости.
Идея была подсмотрена внутри pyserial-asyncio. При запросе байта запускается мониторинг файлового дескриптора последовательного порта, создается Future и на этом выполнение метода readByte приостанавливается. При срабатывании callback считывается один байт из порта, останавливается мониторинг (только один же байт запрошен) и устанавливается значение Future. Вызвавший код получает свой байт и возобновляет выполнение, в этом примере - цикл в функции main.
import serial
import asyncio
class ASerial:
_serial = serial.Serial(timeout=0, write_timeout=0)
def __init__(self, port, baudrate):
self._serial.port = port
self._serial.baudrate = baudrate
self._serial.open()
async def readByte(self):
loop = asyncio.get_running_loop()
future = loop.create_future()
def cb():
d = self._serial.read(1)
loop.remove_reader(self._serial.fileno())
future.set_result(d)
loop.add_reader(self._serial.fileno(), cb)
return await future
async def main():
s = ASerial("/dev/ttyUSB0", 9600)
while True:
c = await s.readByte()
print(c)
if __name__ == "__main__":
asyncio.run(main())
При запуске в консоли:
(venv) dvetutnev@vulpecula:~/fart-checker$ python aserial.py
b'\xff'
b'\x86'
b'\x00'
b'\x00'
b'\x04'
b'\x00'
b'\x00'
b'\x00'
b'v'
b'\xff'
b'\x86'
b'\x00'
b'\x00'
b'\x04'
b'\x00'
b'\x00'
b'\x00'
b'v'
Производительность такой реализации сомнительная (на каждый байт аж целый Future инстанционируется), но для моих целей хватит. Простое решение работает лучше.
Do it.
Из коробки доступы асинхронные mock-и, поэтому разработку буду вести сверху вниз.
Разрабатываемая асинхронная функция - readSensor. Отправляет в порт команду переключения в режим запрос/ответ, дожидается ответа подтверждения смены режима (с таймаутом) и запускает цикл запроса/получения (с таймаутом) и обработки данных с датчика. Принимает в параметрах экземпляр ASerial, объект с константами порта и функций парсинга пакета данных, и callback для отправки подготовленных данных.
Тест переключения режима и тест основного цикла.
Реализация:
import gas_sensor
import asyncio
from asyncio import TimeoutError
from serial import SerialException
async def readSensor(port, gasSensor, dashBoard):
try:
async def switchMode():
await port.write(gasSensor.switch_mode_cmd)
while True:
packet = await gas_sensor.readPacket(port)
if packet == gasSensor.approve_switch_mode:
return
await asyncio.wait_for(switchMode(), 1)
async def getSample():
await port.write(gasSensor.read_cmd)
return await gas_sensor.readPacket(port)
while True:
sample = await asyncio.wait_for(getSample(), 1)
item = gasSensor.parsePacket(sample)
dashBoard(item)
await asyncio.sleep(1)
except (SerialException, TimeoutError) as ex:
raise gas_sensor.ASerialException(ex)
State-машина из начала поста изображена на одной картинке, поэтому для большей наглядности реализация выполнена одной функцией. Для этого нужна различная реакция asyncio.wait_for (await для переданной корутины или выброс исключения) в одном тест-кэйсе. AsyncMock такого из коробки не позволяет. Для этого был сделан генератор, вызывающий (если точнее выполняющий await) функции передавая в них первый аргумент из asyncio.wait_for, либо бросающий исключение. Генератор инициализируется списком функций/исключений и присваивается полю side_effect экземпляра AsyncMock.
def awaitOrRaise(items):
def getItem():
for item in items:
if isclass(item) and issubclass(item, BaseException):
raise item
yield item
generator = getItem()
async def effect(arg, *args, **kwargs):
f = next(generator)
return await f(arg)
return effect
Теперь функция чтения пакета из порта:
async def readPacket(port):
packet = bytes()
while True:
c = await port.readByte()
if c == b"\xFF":
packet += c
break
while len(packet) < 9:
packet += await port.readByte()
if packet[-1] != gas_sensor.calcChecksum(packet):
raise gas_sensor.ASerialException("Invalid checksum")
return packet
И минимальный main для демонстрации работы.
Заглавная картинка взята тут.