Газоанализатор Online. Асинхронность.

Газоанализатор Online. Асинхронность.

Что-то пошло не так.

Для явной обработки ошибок ввода/вывода и таймаутов реализовать взаимодействие с газовыми датчиками я планировал при помощи state-машин. Машина состояний для перевода датчика в режим запрос-ответ и чтение данных была разработана такая:

Но на Python библиотеки state-машин оказались какими-то совсем беззубыми. Они и близко ни поддерживают то, что в заложено в машины состояний UML и реализовано в Boost.SML/Boost.MSM из мира C++. Не получалось реализовать механизм, когда в state-машину просто передаешь байты, она сама выполняет необходимые действия, после обработки очередного байта приостанавливается (сохраняя состояние), а в это время процесс занимается чем-то еще (кооперативная многозадачность, нужна для работы с несколькими датчиками внутри одного процесса).

Но у Python есть механизм async/await, на котором можно реализовать подобный функционал. Нахождение в состоянии - await (states в терминологии UML), выполнение действий  - вызов функций (actions), ветвления - условия перехода (guards), а переход от одного await к другому - переключение между состояниями (transitions). По сути получается та же state-машина, только вывернутая наизнанку в код и без выделенной таблицы переходов. Еще нужна возможность асинхронно (await) получать по одному байту из последовательного порта, но существующий модуль pyserial-asyncio такой возможности из коробки не предоставляет.

Проверка реализуемости.

Идея была подсмотрена внутри pyserial-asyncio. При запросе байта запускается мониторинг файлового дескриптора последовательного порта, создается Future и на этом выполнение метода readByte приостанавливается. При срабатывании callback считывается один байт из порта, останавливается мониторинг (только один же байт запрошен) и устанавливается значение Future. Вызвавший код получает свой байт и возобновляет выполнение, в этом примере - цикл в функции main.

import serial
import asyncio


class ASerial:
    _serial = serial.Serial(timeout=0, write_timeout=0)


    def __init__(self, port, baudrate):
        self._serial.port = port
        self._serial.baudrate = baudrate

        self._serial.open()


    async def readByte(self):
        loop = asyncio.get_running_loop()
        future = loop.create_future()

        def cb():
            d = self._serial.read(1)
            loop.remove_reader(self._serial.fileno())
            future.set_result(d)

        loop.add_reader(self._serial.fileno(), cb)

        return await future


async def main():
    s = ASerial("/dev/ttyUSB0", 9600)
    while True:
        c = await s.readByte()
        print(c)


if __name__ == "__main__":
    asyncio.run(main())

При запуске в консоли:

(venv) dvetutnev@vulpecula:~/fart-checker$ python aserial.py 
b'\xff'
b'\x86'
b'\x00'
b'\x00'
b'\x04'
b'\x00'
b'\x00'
b'\x00'
b'v'
b'\xff'
b'\x86'
b'\x00'
b'\x00'
b'\x04'
b'\x00'
b'\x00'
b'\x00'
b'v'

Производительность такой реализации сомнительная (на каждый байт аж целый Future инстанционируется), но для моих целей хватит. Простое решение работает лучше.

Do it.

Из коробки доступы асинхронные mock-и, поэтому разработку буду вести сверху вниз.

Разрабатываемая асинхронная функция - readSensor. Отправляет в порт команду переключения в режим запрос/ответ, дожидается ответа подтверждения смены режима (с таймаутом) и запускает цикл запроса/получения (с таймаутом) и обработки данных с датчика. Принимает в параметрах экземпляр ASerial, объект с константами порта и функций парсинга пакета данных, и callback для отправки подготовленных данных.

Тест переключения режима и тест основного цикла.

Реализация:

import gas_sensor
import asyncio

from asyncio import TimeoutError
from serial import SerialException


async def readSensor(port, gasSensor, dashBoard):
    try:
        async def switchMode():
            await port.write(gasSensor.switch_mode_cmd)
            while True:
                packet = await gas_sensor.readPacket(port)
                if packet == gasSensor.approve_switch_mode:
                    return

        await asyncio.wait_for(switchMode(), 1)

        async def getSample():
            await port.write(gasSensor.read_cmd)
            return await gas_sensor.readPacket(port)

        while True:
            sample = await asyncio.wait_for(getSample(), 1)
            item = gasSensor.parsePacket(sample)
            dashBoard(item)
            await asyncio.sleep(1)


    except (SerialException, TimeoutError) as ex:
        raise gas_sensor.ASerialException(ex)

State-машина из начала поста изображена на одной картинке, поэтому для большей наглядности реализация выполнена одной функцией. Для этого нужна различная реакция asyncio.wait_for (await для переданной корутины или выброс исключения)  в одном тест-кэйсе. AsyncMock такого из коробки не позволяет. Для этого был сделан генератор, вызывающий (если точнее выполняющий await) функции передавая в них первый аргумент из asyncio.wait_for, либо бросающий исключение. Генератор инициализируется списком функций/исключений и присваивается полю side_effect экземпляра AsyncMock.

def awaitOrRaise(items):
    def getItem():
        for item in items:
            if isclass(item) and issubclass(item, BaseException):
                raise item
            yield item

    generator = getItem()

    async def effect(arg, *args, **kwargs):
        f = next(generator)
        return await f(arg)

    return effect

Теперь функция чтения пакета из порта:

async def readPacket(port):
    packet = bytes()

    while True:
        c = await port.readByte()
        if c == b"\xFF":
            packet += c
            break

    while len(packet) < 9:
        packet += await port.readByte()

    if packet[-1] != gas_sensor.calcChecksum(packet):
        raise gas_sensor.ASerialException("Invalid checksum")

    return packet

И минимальный main для демонстрации работы.


Заглавная картинка взята тут.

Subscribe to Заметочки

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
jamie@example.com
Subscribe