Газоанализатор Online. Прием данных с датчиков.
Проектирование.
Протокол общения с датчиками довольно простой: стартовый байт 0xFF, 7 байт данных и один байт контрольной суммы.
Машина состояний приемника на UML будет такая.
Контрольная сумма может как быть в пакете (последним байтом), так и прилететь отдельно (UART же). Я набросал черновик получения контрольной суммы и ее проверки. Визуализация это важно.
Реализация.
Парсер протокола сделаем с явно заданной таблицей состояний. Машина состояний реализуется библиотекой [transitions](https://github.com/pytransitions/transitions). Объект описывающий состояние машины и переключение состояний у нее зачем-то вынесен в отдельный класс (Matter). При инициализации машины состояний в него добавляются методы запроса и переключения состояний. Конструктор парсера будет таким:
class Receiver:
class Matter(object):
pass
def __init__(self):
self._lump = self.Matter()
states = ["wait_start", "receiving", "wait_checksum", "done"]
transitions = [
{"trigger": "start", "source": "wait_start", "dest": "receiving"},
{"trigger": "checksum", "source": "receiving", "dest": "wait_checksum"},
{"trigger": "done", "source": "wait_checksum", "dest": "done"},
{"trigger": "restart", "source": "wait_checksum", "dest": "wait_start"}
]
self._state_machine = Machine(model=self._lump, states=states, transitions=transitions, initial="wait_start")
self._data = bytearray()
Основной метод приема данных:
def put(self, data):
if self._lump.is_wait_start():
incoming_packet = self._skip_start_byte(data)
if not incoming_packet:
return
self._lump.start()
else:
incoming_packet = data
last_idx = None
if self._lump.is_receiving():
last_idx = min(7 - len(self._data), len(incoming_packet))
self._data += incoming_packet[:last_idx]
if len(self._data) == 7:
self._lump.checksum()
if self._lump.is_wait_checksum():
if last_idx and (last_idx < len(incoming_packet)): # Checksum in current packet
checksum = incoming_packet[last_idx]
elif last_idx and (last_idx == len(incoming_packet)): # Checksum in next packet
return
else: # Separated checksum from next packet
checksum = incoming_packet[0]
calculated_checksum = calc_checksum([0xFF] + list(self._data))
if checksum == calculated_checksum:
self._lump.done()
else:
self._data = bytearray()
self._lump.restart()
Простенький main:
#!/usr/bin/env python
# Project FartCHECKER
# Dmitriy Vetutnev, 2021
import serial
from uart import Receiver
from time import localtime, asctime
def main():
port = serial.Serial("/dev/ttyUSB0")
while True:
rx = Receiver()
packet = port.read(9)
print(packet)
rx.put(packet)
data = rx.get_data()
concentration = (data[1] * 256) + data[2]
ts = asctime(localtime())
print("%s %s ppm" % (ts, concentration))
if __name__ == "__main__":
main()
Тестирование.
Перевернутая банка, сигарета, к банке на скотч приклеен датчик:
Текущая версия кода тут.