Как читать RTSP / видео кадры и асинхронно загружать их в облачное хранилище

ИСПОЛЬЗОВАТЬ СЛУЧАЙ

Я полагаю, что в этом современном мире большинство из нас знакомо с множеством новых отраслей, в которых используются приложения компьютерного зрения. В частности, камеры видеонаблюдения и видеоанализ, которые играют важную роль в технологиях компьютерного зрения.

Например, когда мы анализируем камеру видеонаблюдения, в качестве первого шага мы должны прочитать URL-адрес RTSP с помощью OpenCV, а затем сохранить его где-нибудь в облаке для дальнейшего анализа.

Но проблема в том, что когда мы загружаем кадры один за другим в облако, загрузка займет некоторое время, не так ли?

Чтобы получить четкое представление об этом, я провел эксперимент с ведром Google, и он подсчитал, что один кадр занимает 1,05 секунды для загрузки корзины Google. Из-за этого нам придется ждать 1 секунду, чтобы получить ответ, а затем нам нужно загрузить следующие кадры в строке.

Я был наплевать, чтобы найти решение для этого, в вашем ожидании, вот оно!

Решение состоит в том, что мы можем загружать кадры асинхронно, используя Celery.

Когда мы загружаем кадры асинхронным способом, мы не можем получить кадры последовательности, в качестве средства мы должны использовать концепцию группы и цепочки в Celery.

Многим (не имеющим опыта) было бы интересно узнать; Что такое сельдерей?

Сельдерей - один из самых популярных фонов для менеджеров по работе в мире Python.

Celery совместим с несколькими брокерами сообщений, такими как RabbitMQ или Redis. Они могут действовать и как производитель, и как потребитель. Кроме того, Celery - это асинхронная очередь задач / очередь заданий, основанная на распределенной передаче сообщений. В дополнение к этому, он ориентирован на операции в реальном времени и поддерживает планирование.

Уточнив определение, давайте посмотрим ниже, как настроить сельдерей с помощью кода Python.

Шаг 1. Импортируйте весь необходимый пакет сельдерея.

from celery import Celery
from celery.result import AsyncResult
from celery.result import allow_join_result
from celery.decorators import periodic_task

Шаг 2: - Мы должны настроить брокера и серверную часть в сельдерее. Я использовал Redis в качестве бэкэнда, поэтому установите Redis в вашу систему и убедитесь, что он работает успешно;

app = Celery(‘tasks’, backend=’redis://[email protected]:6379', broker=’redis://[email protected]:6379')

Шаг 3: - Чтобы вызвать функцию асинхронным способом, мы должны поместить в функцию «аннотацию @ app.taks».

Вот пример кода сельдерея для загрузки кадров в корзину Google.

@app.task(bind=True, max_retries=30)
def upload_frames_gcs(self, file_name):
    try:
        url = upload_file_to_gcs(file_name)
        return url
    except Exception as e:
        raise self.retry(exc=e)

Шаг 4: - Ниже приведены наиболее важные шаги:

Мы не сможем напрямую вызывать функцию и загружать кадры асинхронным способом, потому что мы не можем получить кадры последовательности после загрузки, поэтому мы должны использовать концепцию Цепочки и Группы в сельдерее для загрузки кадров в ведро. Используя эту технику, мы можем загружать до 5 или 10 кадров параллельно, а также можем получить последовательность кадров. Однако, прежде чем переходить к кодированию, давайте сначала рассмотрим что такое цепочки и группы в Celery.

Цепи в сельдерее

Цепочка - это примитив, который позволяет нам связать больше задач в одну особую сигнатуру, так что она называется «одна за другой, по сути формируя цепочку обратных вызовов».

Возможно, если вы все еще не уверены, тем не менее, приведенная ниже диаграмма даст вам четкое представление о том, как работает цепочка у сельдерея. Это id задачи в сельдерее.

Группы в сельдерее

Групповой примитив - это подпись, которая принимает список задач, которые должны выполняться параллельно.

Это пример кода, объясняющий, Как я загружал кадры в ведро Google, используя методы групп и цепочек в сельдерее.

jobs = group(upload_frames_gcs.s(file_name, ts) for ts, file_name in file_name_dic.items())
result = jobs.apply_async()

Понятно, что я вызываю функцию upload_frames_gcs внутри метода группы, после чего вы можете увидеть «s» для передачи параметра под названием «Концепция цепочек» в сельдерея, это позволяет связать подписи, в результате «Один вызывается за другим, по сути образуя цепочку обратных вызовов». Наконец, мы можем получить группу результатов в одной задаче.

Шаг 5: - Неудивительно, если вы думаете, как получить URL фреймов после загрузки в сельдерее. Вот так просто: в переменной результата вы можете получить идентификатор задачи этой групповой функции, и мы можем использовать идентификатор задачи для получения результатов.

Однако будьте осторожны, проверяя статус задачи, и как только она будет завершена, мы сможем получить URL-адрес фреймов.

def taskid_status(task_id_array):
        for task in task_id_array:
            if task.successful():
                task_id_array.remove(task)
                with allow_join_result():
                    frames_array = []
                    for results in task.join(): 
                        frame_dic = {}
                        frame_dic['frame_url'] = results[0]
                        frames_array.append(frame_dic)
       return task_id_array, frames_array

В переменной frames_array вы можете получить все кадры с отметкой времени.

Я тестировал производительность на нескольких разных тестовых примерах.

  1. Загрузка 5 кадров в хранилище Google занимает 0,85 секунды.
  2. Загрузка 10 кадров в хранилище Google занимает от 0,77 до 0,82 секунды.
  3. Загрузка в хранилище Google 15 кадров занимает от 0,9 до 1,0 секунды.
  4. 30 кадров загружаются в хранилище Google за 0,7–0,8 секунды.

Из приведенного выше очевидно, что нет большой разницы в увеличении количества кадров для загрузки в корзину, потому что многопроцессорность используется для одновременного выполнения задач в сельдерее.

Если вам нужна дополнительная помощь или помощь. Напишите мне balavenkatesh.com 📬 Я рада Вам помочь.