Газоанализатор Online. Прием данных с датчиков.

Газоанализатор Online. Прием данных с датчиков.

Проектирование.

Протокол общения с датчиками довольно простой: стартовый байт 0xFF, 7 байт данных и один байт контрольной суммы.

Машина состояний приемника на UML будет такая.

Контрольная сумма может как быть в пакете (последним байтом), так и прилететь отдельно (UART же). Я набросал черновик получения контрольной суммы и ее проверки. Визуализация это важно.

Реализация.

Парсер протокола сделаем с явно заданной таблицей состояний. Машина состояний реализуется библиотекой [transitions](https://github.com/pytransitions/transitions). Объект описывающий состояние машины и переключение состояний у нее зачем-то вынесен в отдельный класс (Matter). При инициализации машины состояний в него добавляются методы запроса и переключения состояний. Конструктор парсера будет таким:

class Receiver:
    class Matter(object):
        pass

    def __init__(self):
        self._lump = self.Matter()

        states = ["wait_start", "receiving", "wait_checksum", "done"]
        transitions = [
            {"trigger": "start", "source": "wait_start", "dest": "receiving"},
            {"trigger": "checksum", "source": "receiving", "dest": "wait_checksum"},
            {"trigger": "done", "source": "wait_checksum", "dest": "done"},
            {"trigger": "restart", "source": "wait_checksum", "dest": "wait_start"}
        ]

        self._state_machine = Machine(model=self._lump, states=states, transitions=transitions, initial="wait_start")

        self._data = bytearray()

Основной метод приема данных:

def put(self, data):
    if self._lump.is_wait_start():
        incoming_packet = self._skip_start_byte(data)
        if not incoming_packet:
            return
        self._lump.start()
    else:
        incoming_packet = data

    last_idx = None
    if self._lump.is_receiving():
        last_idx = min(7 - len(self._data), len(incoming_packet))
        self._data += incoming_packet[:last_idx]
        if len(self._data) == 7:
            self._lump.checksum()

    if self._lump.is_wait_checksum():
        if last_idx and (last_idx < len(incoming_packet)):    # Checksum in current packet
            checksum = incoming_packet[last_idx]
        elif last_idx and (last_idx == len(incoming_packet)): # Checksum in next packet
            return
        else:                                   # Separated checksum from next packet
            checksum = incoming_packet[0]

        calculated_checksum = calc_checksum([0xFF] + list(self._data))
        if checksum == calculated_checksum:
            self._lump.done()
        else:
            self._data = bytearray()
            self._lump.restart()

Простенький main:

#!/usr/bin/env python

# Project FartCHECKER
# Dmitriy Vetutnev, 2021


import serial
from uart import Receiver
from time import localtime, asctime


def main():
    port = serial.Serial("/dev/ttyUSB0")
    while True:
        rx = Receiver()

        packet = port.read(9)
        print(packet)

        rx.put(packet)
        data = rx.get_data()

        concentration = (data[1] * 256) + data[2]
        ts = asctime(localtime())
        print("%s %s ppm" % (ts, concentration))


if __name__ == "__main__":
    main()

Тестирование.

Перевернутая банка, сигарета, к банке на скотч приклеен датчик:

Текущая версия кода тут.

Subscribe to Заметочки

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
jamie@example.com
Subscribe