PiZero W подключен к двум периферийным устройствам (GPIO и USB): как непрерывно читать с обоих одновременно?

У меня есть raspberry pizero W, подключенный через контакты GPIO к расходомеру и USB к сканеру штрих-кода. У меня есть скрипт python, который использует функцию обратного вызова, чтобы получать предупреждения при обнаружении ввода GPIO. Этот скрипт python должен постоянно работать на pizero, чтобы распознавать, когда расходомер активирован, и обрабатывать ввод.

Проблема в том, что у меня также есть сканер штрих-кода, подключенный через USB к pizero. Я бы хотел, чтобы pizero также распознавал сканирование штрих-кода и обрабатывал этот ввод.

Затем pizero должен отправить сообщение, которое включает как информацию от расходомера, так и информацию со сканера штрих-кода.

Есть ли способ сделать это в том же скрипте Python? Как я могу заставить pizero прослушивать и обрабатывать два входа одновременно? Было бы легче реализовать разделение этого на два разных сценария, и если да, могу ли я запустить их оба одновременно и как-то объединить информацию, которую они предоставляют, в третьем непрерывно выполняемом сценарии?

Спасибо!

Некоторые пояснения к комментариям (спасибо за ваш вклад):

  • входной контакт расходомера GPIO 17, который является соединением SPI
  • также подключите вывод питания 5 В и заземления.

Скрипт необходимо запускать при запуске системы. Я посмотрю на systemctl, поскольку я не слышал о нем, пока об этом не упомянули.

Pi обычно распознает сканируемый штрих-код как ввод с клавиатуры (то есть серию цифр, за которыми следует символ новой строки), когда расходомер не подключен.

Когда я отправляю сообщение, содержащее информацию о расходомере и штрих-коде, мне нужно отправить объект JSON из python, который включает как фрагменты информации, так и отметку времени, когда информация была получена.

Этот объект JSON будет отправлен по Wi-Fi на сервер Raspberry Pi со статическим IP-адресом в той же домашней сети, что и pizero. Сервер raspberry pi имеет доступ к базе данных Django, которая должна включать информацию об объекте JSON в базу данных.


person BeginnersMindTruly    schedule 21.10.2019    source источник
comment
В вашем вопросе отсутствуют некоторые детали. Как подключается расходомер по GPIO - это I2C, SPI, последовательный? Нужно ли всегда запускать скрипт при запуске системы, и работали ли вы с этой частью с помощью systemctl? Как Pi обычно распознает сканируемый штрих-код - даже если расходомер не подключен? Вам нужно отправить сообщение, когда что-то происходит - как это должно произойти? Вай фай? Серийный? Что это за сообщение - изображение? Какой-то текст?   -  person Mark Setchell    schedule 21.10.2019
comment
Спасибо, Марк. Уточнено выше, благодарю за вопросы и отзывы. Полезный   -  person BeginnersMindTruly    schedule 22.10.2019


Ответы (2)


Обновленный ответ

Я добавил код для считывателя штрих-кода. Я сделал так, что считывателю штрих-кода требуется разное время, до 5 секунд, чтобы считывать показания, а расходомеру - постоянные 0,5 секунды, поэтому вы можете видеть, что разные потоки выполняются с разной скоростью независимо друг от друга.

#!/usr/bin/env python3

from threading import Lock
import threading
import time
from random import seed
from random import random

# Dummy function to read SPI as I don't have anything attached
def readSPI():
    # Take 0.5s to read
    time.sleep(0.5)
    readSPI.static += 1
    return readSPI.static
readSPI.static=0

class FlowMeter(threading.Thread):
    def __init__(self):
        super(FlowMeter, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read flowmeter and safely update self.currentReading
        while True:
            value = readSPI()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

# Dummy function to read Barcode as I don't have anything attached
def readBarcode():
    # Take variable time, 0..5 seconds, to read
    time.sleep(random()*5)
    result = "BC" + str(int(random()*1000))
    return result

class BarcodeReader(threading.Thread):
    def __init__(self):
        super(BarcodeReader, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read barcode and safely update self.currentReading
        while True:
            value = readBarcode()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

if __name__ == '__main__':

    # Generate repeatable random numbers
    seed(42)

    # Instantiate and start flow meter manager thread
    fmThread = FlowMeter()
    fmThread.daemon = True
    fmThread.start()

    # Instantiate and start barcode reader thread
    bcThread = BarcodeReader()
    bcThread.daemon = True
    bcThread.start()

    # Now you can do other things in main, but always get access to latest readings
    for i in range(20):
        fmReading = fmThread.read()
        bcReading = bcThread.read()
        print(f"Main: i = {i} FlowMeter reading = {fmReading}, Barcode={bcReading}")
        time.sleep(1)

Пример вывода

Main: i = 0 FlowMeter reading = 0, Barcode=0
Main: i = 1 FlowMeter reading = 1, Barcode=0
Main: i = 2 FlowMeter reading = 3, Barcode=0
Main: i = 3 FlowMeter reading = 5, Barcode=0
Main: i = 4 FlowMeter reading = 7, Barcode=BC25
Main: i = 5 FlowMeter reading = 9, Barcode=BC223
Main: i = 6 FlowMeter reading = 11, Barcode=BC223
Main: i = 7 FlowMeter reading = 13, Barcode=BC223
Main: i = 8 FlowMeter reading = 15, Barcode=BC223
Main: i = 9 FlowMeter reading = 17, Barcode=BC676
Main: i = 10 FlowMeter reading = 19, Barcode=BC676
Main: i = 11 FlowMeter reading = 21, Barcode=BC676
Main: i = 12 FlowMeter reading = 23, Barcode=BC676
Main: i = 13 FlowMeter reading = 25, Barcode=BC86
Main: i = 14 FlowMeter reading = 27, Barcode=BC86
Main: i = 15 FlowMeter reading = 29, Barcode=BC29
Main: i = 16 FlowMeter reading = 31, Barcode=BC505
Main: i = 17 FlowMeter reading = 33, Barcode=BC198
Main: i = 18 FlowMeter reading = 35, Barcode=BC198
Main: i = 19 FlowMeter reading = 37, Barcode=BC198

Исходный ответ

Я бы посоветовал вам взглянуть на systemd и systemctl, чтобы ваше приложение запускалось при каждом запуске системы - например, здесь.

Что касается одновременного мониторинга двух вещей, я бы посоветовал вам использовать модуль Python threading. Вот краткий пример. Я создаю объект, являющийся подклассом threading, который управляет вашим расходомером, постоянно считывая его и сохраняя текущее значение в переменной, которую основная программа может прочитать в любое время. Вы можете запустить другой аналогичный, который управляет вашим считывателем штрих-кода, и запускать их параллельно. Я не хотел этого делать и запутывать вас двойным кодом.

#!/usr/bin/env python3

from threading import Lock
import threading
import time

# Dummy function to read SPI as I don't have anything attached
def readSPI():
    readSPI.static += 1
    return readSPI.static
readSPI.static=0

class FlowMeter(threading.Thread):
    def __init__(self):
        super(FlowMeter, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read flowmeter and safely update self.currentReading
        while True:
            value = readSPI()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()
            time.sleep(0.01)

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

if __name__ == '__main__':

    # Instantiate and start flow meter manager thread
    fmThread = FlowMeter()
    fmThread.start()

    # Now you can do other things in main, but always get access to latest reading
    for i in range(100000):
        fmReading = fmThread.read()
        print(f"Main: i = {i} FlowMeter reading = {fmReading}")
        time.sleep(1)

Вы можете использовать logging для координации и унификации сообщений отладки и ведения журнала - см. здесь.

Вы можете посмотреть events, чтобы другие потоки знали, что что-то нужно делать, когда что-то достигает критического уровня - например, здесь.

person Mark Setchell    schedule 22.10.2019
comment
Спасибо, Марк, очень полезно. Итак, я бы просто создал еще один поток функций для штрих-кода и запустил его в основном? - person BeginnersMindTruly; 23.10.2019
comment
Кроме того, @MarkSetchell только что разместил здесь связанный вопрос: stackoverflow.com/questions/58565569/ кажется, что вы эксперт, хотел бы любой вклад. Спасибо! - person BeginnersMindTruly; 25.10.2019

Другой, возможно, более простой вариант - использовать Redis. Redis - это очень высокопроизводительный сервер структуры данных в памяти. Его просто установить на Raspberry Pi, Mac, Linux Windows или другую машину. Он позволяет вам обмениваться атомарными целыми числами, строками, списками, хешами, очередями, наборами и упорядоченными наборами между любым количеством клиентов в сети.

Таким образом, концепция может заключаться в том, чтобы иметь отдельную программу, отслеживающую расходомер и загружающую текущие показания в Redis так часто, как вы хотите. Затем другая отдельная программа считывает штрих-коды и вставляет их в Redis так часто, как вам нравится. И, наконец, имейте управляющую программу, возможно, где-нибудь еще в вашей сети, которая может захватывать оба значения так часто, как захочет.

Обратите внимание, что вы можете запустить сервер Redis на Raspberry Pi или любой другой машине.

Итак, вот программа расходомера - просто измените host на IP-адрес машины, на которой запущен Redis:

#!/usr/bin/env python3

import redis
import time

host='localhost'
port=6379

# Connect to Redis
r = redis.Redis(host,port)

reading = 0
while True:
   # Generate synthetic reading that just increases every 500ms
   reading +=1 
   # Stuff reading into Redis as "fmReading"
   r.set("fmReading",reading)
   time.sleep(0.5)

Вот программа для чтения штрих-кода:

#!/usr/bin/env python3

import redis
import time
from random import random, seed

host='localhost'
port=6379

# Connect to local Redis server
r = redis.Redis(host,port)

# Generate repeatable random numbers
seed(42)

while True:
   # Synthesize barcode and change every 2 seconds
   barcode = "BC" + str(int((random()*1000)))
   # Stuff barcode into Redis as "barcode"
   r.set("barcode",barcode)
   time.sleep(2)

А вот и основная программа управления:

#!/usr/bin/env python3

import redis
import time

host='localhost'
port=6379

# Connect to Redis server
r = redis.Redis(host,port)

while True:
   # Grab latest flowmeter reading and barcode
   fmReading = r.get("fmReading")
   barcode   = r.get("barcode")
   print(f"Main: fmReading={fmReading}, barcode={barcode}")
   time.sleep(1)

Пример вывода

Main: fmReading=b'10', barcode=b'BC676'
Main: fmReading=b'12', barcode=b'BC892'
Main: fmReading=b'14', barcode=b'BC892'
Main: fmReading=b'16', barcode=b'BC86'
Main: fmReading=b'18', barcode=b'BC86'
Main: fmReading=b'20', barcode=b'BC421'
Main: fmReading=b'22', barcode=b'BC421'
Main: fmReading=b'24', barcode=b'BC29'

Обратите внимание, что для отладки вы также можете получить любые показания с помощью интерфейса командной строки для Redis с любого компьютера в вашей сети, например в Терминале:

redis-cli get barcode
"BC775"

Если вы хотите отображать значения в веб-браузере, написанном на PHP, вы можете использовать привязки PHP к Redis для получения значений - очень удобно!

Конечно, вы можете настроить программу так, чтобы она отправляла временные метки показаний с каждым из них - возможно, используя хэш Redis, а не простой ключ. Вы также можете реализовать очередь для отправки сообщений между программой, используя Redis LPUSH и BRPOP.

Ключевые слова: Redis, список, очередь, хеш, Raspberry Pi.

person Mark Setchell    schedule 23.10.2019
comment
Другой вариант! Огромное спасибо. Не уверен, что хочу задействовать другую библиотеку, но посмотрю. Сервер raspberry pi будет настроен для размещения базы данных django, которая также отправляет и получает объекты json. Легко ли это интегрировать в Redis? - person BeginnersMindTruly; 24.10.2019