Оптимизация пересечения границы с обнаружением и отслеживанием объектов

Пересечение границы может быть захватывающей частью любой поездки, но разочарование от длинных очередей на пограничных переходах может быстро лишить удовольствия. Что, если бы мы могли определить самую быструю полосу движения и занять соответствующую позицию? Благодаря достижениям в области машинного зрения это стало возможным. Используя OpenCV и YOLOv3, алгоритм глубокого обучения, теперь можно обнаруживать и отслеживать движущиеся транспортные средства в режиме реального времени. В этом посте мы рассмотрим, как можно использовать эту технологию, чтобы помочь людям сэкономить время и избежать стресса от длинных очередей при пересечении границы.

Содержание

  • Обзор проблемы
  • Данные
  • Обнаружение объектов
    – Обнаружение автомобилей
    – Обнаружение полосы движения
  • Отслеживание объектов
  • Организовывать все
  • Ограничения
  • Будущая работа

Обзор проблемы

Основная идея этой задачи состоит в том, чтобы обрабатывать необработанные данные изображения, поступающие из прямых трансляций пограничных камер, для обнаружения и отслеживания движущихся транспортных средств. Задачу можно разделить на несколько шагов:

  • Получите необработанные данные изображения из прямой трансляции пограничных камер
  • Предварительно обработайте изображение
  • Применяйте алгоритмы для определения местоположения транспортных средств
  • Определить полосу для этих транспортных средств
  • Применяйте алгоритмы для определения скорости транспортных средств (и, в свою очередь, скорости каждой полосы)

Все эти шаги объединены и организованы с помощью docker-compose для запуска решения на нескольких граничных видеопотоках.

Данные

Видеоданные поступают с пограничных камер, открытых для публики. Предлагаемые здесь решения можно использовать в любой прямой трансляции (.m3u8) с пограничных/платных камер и легко адаптировать для других форм видео. Почти во всех странах мира есть какой-либо сервис, который позволяет пользователям в режиме реального времени видеть условия пересечения границы. Для этого конкретного случая я использовал прямые трансляции камер моей страны. По этой ссылке вы можете найти стримы.

Обнаружение объекта

Прежде чем запускать какие-либо алгоритмы, нам нужно загрузить поток и предварительно обработать изображение. Это легко сделать с помощью OpenCV2. Я создал функцию, которая изменяет размер изображения до 416x416 пикселей (формат, требуемый реализацией модели YOLOv3, которую я использовал).

def preprocess(image):
    resized_image = cv2.resize(image, (416, 416))
    blob = cv2.dnn.blobFromImage(resized_image, 1 / 255, (416, 416), swapRB=True, crop=False)
    return resized_image, blob

В цикле while изображения загружаются из потока и предварительно обрабатываются с помощью вышеуказанной функции. Все последующие изменения также будут производиться в этом цикле while. Дополнительно я проверяю, не изменилось ли что-то по сравнению с предыдущим кадром, так как нет необходимости снова обрабатывать изображение, если ничего не изменилось.

old_frame = None
while True:
    # read the stream and get the current image
    cap = cv2.VideoCapture(BORDER_URL.m3u8)
    ret, frame = cap.read()
    if not ret:
        break

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    # resize the image to correct formant
    image, blob = preprocess(frame)
    
    # check if the image is the same as the previous one by calculating the mean squared error between the two
    if old_frame is not None:
        mse = np.mean((image - old_frame) ** 2)
        if mse < 0.001:
            old_frame = image
            continue

Обнаружение автомобиля

Теперь у нас все готово для обнаружения транспортных средств на изображении. Я использовал YOLOv3 для получения ограничительных рамок всех классов, на которых обучалась модель, но сохранил только те, которые являются транспортными средствами. Для загрузки этой модели необходимы грузы и файлы конфигурации, их можно скачать с официального сайта. Эта функция возвращает ограничивающие рамки, достоверность модели для обнаруженного объекта, класс объекта (например, автомобиль, грузовик, человек и т. д.) и центры каждой из ограничивающих рамок.

def detect_cars(image, blob):
    net = cv2.dnn.readNet("./yolov3.weights", "./yolov3.cfg")
    net.setInput(blob)
    layer_names = net.getLayerNames()
    output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
    predictions = net.forward(output_layers)
    boxes = []
    confidences = []
    class_ids = []
    centers = []
    for prediction in predictions:
        for detection in prediction:
            scores = detection[5:]
            class_id = int(np.argmax(scores))
            confidence = float(scores[class_id])

            # Filter out weak detections
            if confidence > 0.5:
                # Get detection coordinates
                x, y, w, h = (detection[0:4] * np.array(
                    [image.shape[1], image.shape[0], image.shape[1], image.shape[0]])).astype("int")
                x = int(x - w / 2)
                y = int(y - h / 2)

                center_x = int(x + w // 2)
                center_y = int(y + h // 2)

                centers.append((center_x, center_y))
                boxes.append([int(x), int(y), int(x + w), int(y + h)])
                confidences.append(float(confidence))
                class_ids.append(class_id)
    
    return boxes, confidences, class_ids, centers

У нас есть вся информация о ограничивающих прямоугольниках, однако, когда мы запускаем приложение, мы видим, что есть много перекрывающихся прямоугольников, которые нужно удалить. Для этой задачи я использовал немаксимальное подавление (NMS). Немаксимальное подавление (NMS) — это алгоритм постобработки, используемый в задачах обнаружения объектов, в частности, для удаления избыточных или перекрывающихся ограничивающих рамок для одного и того же объекта.

NMS применяется для фильтрации избыточных ограничивающих рамок и сохранения только наиболее подходящих. Основная идея NMS состоит в том, чтобы сначала отсортировать ограничивающие рамки на основе их оценки достоверности обнаружения (т. Е. Вероятности того, что ограничивающая рамка содержит объект). Начиная с ограничивающей рамки с наивысшей оценкой достоверности, NMS подавляет все перекрывающиеся ограничивающие рамки, у которых значение Intersection over Union (IoU) превышает определенный порог (например, 0,5).

Следующая функция выполняет этот алгоритм. Он принимает в качестве параметров ограничивающие рамки, достоверность ограничивающих рамок и порог, который мы хотим отфильтровать дубликаты. Он возвращает идентификаторы ящиков, которые нам нужно сохранить.

def NMS(boxes, confidences, threshold):
    if len(boxes) == 0:
        return []

    boxes = np.array(boxes)
    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]

    scores = confidences
    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
    order = scores.argsort()[::-1]

    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)
        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w = np.maximum(0.0, xx2 - xx1 + 1)
        h = np.maximum(0.0, yy2 - yy1 + 1)
        inter = w * h
        ovr = inter / (areas[i] + areas[order[1:]] - inter)

        inds = np.where(ovr <= threshold)[0]
        order = order[inds + 1]

    return keep

После использования этого алгоритма для каждого транспортного средства сохраняется только одна ограничивающая рамка. Возможно, вам придется настроить пороговое значение, чтобы найти то, что работает для вас.

Обнаружение переулка

Хотя существуют алгоритмы обнаружения полосы движения, которые могут быть эффективными для этой задачи, они могут оказаться не самым эффективным вариантом. Поскольку камеры фиксированы, а положение дорожек не меняется, использование таких алгоритмов было бы пустой тратой времени. Чтобы решить эту проблему, я сохранил позиции дорожек в файле JSON в виде списка точек. Однако перед определением скорости полос остается еще одна проблема: определить, в какой полосе находится каждый автомобиль.

Для этого я создал следующие функции, которые определяют, где находится каждый автомобиль, в зависимости от его координаты x. Он возвращает индекс полосы, в которой находится автомобиль. Если автомобиль находится за пределами полосы движения, он возвращает -1.

def get_x_at_y(line, y):
    loc = -1
    for segment in range(len(line)-1):
        if line[segment][1] <= y <= line[segment + 1][1] or line[segment][1] >= y >= line[segment + 1][1]:
            loc = segment
    if loc == -1:
        return None
    slope = (line[segment+1][1] - line[segment][1]) / (line[segment+1][0]-line[segment][0])
    return (y-line[segment+1][1])/slope + line[segment+1][0]


def track_lanes(car_centers, image, lanes):
    if len(car_centers) < 2:
        return []
    ccs_with_pos = []
    for cc in car_centers:
        xs = []
        for lane in lanes:
            xs.append(get_x_at_y(lane, cc[1]))
        dists = []
        if None in xs:
            ccs_with_pos.append((cc, -1))
            continue
        for x in xs:
            dists.append(x-cc[0])
        if dists[0]>0 or dists[-1]<0:
            ccs_with_pos.append((cc, -1))
            continue
        # check if x value of the car is between each two lines
        for i in range(len(dists)-1):
            if dists[i] < 0 < dists[i + 1]:
                ccs_with_pos.append((cc, i))
                break
    return ccs_with_pos

У нас есть все готово, чтобы увидеть скорость изменения в каждой полосе, внедрив отслеживание объектов.

Отслеживание объектов

Отслеживание объектов — это процесс обнаружения определенного объекта или нескольких объектов в видеопотоке и отслеживания их движения во времени. Целью отслеживания объектов является сохранение идентичности объектов в последовательных кадрах видео, несмотря на изменения положения, ориентации, размера и внешнего вида объекта. Существует множество алгоритмов отслеживания объектов, но для этого конкретного случая использования я решил использовать оптический поток, так как он довольно хорошо работает с потоками с более низкой частотой кадров.

Оптический поток — это метод, который включает анализ изменений интенсивности соседних пикселей в последовательных видеокадрах для определения направления и скорости движения объектов в сцене. Алгоритмы оптического потока основаны на предположении, что яркость пикселя в одном кадре такая же, как и у соответствующего пикселя в следующем кадре, что позволяет рассчитать смещение объектов между кадрами.

Этот алгоритм намного сложнее, поэтому я не буду его подробно объяснять. В качестве параметров он принимает изображение и предыдущее изображение, центры автомобилей из модели YOLOv3, полосы движения, чтобы определить, происходит ли движение в каждой полосе, и автомобили в полосах, чтобы увидеть, является ли движение результатом движения транспортного средства или другие факторы. Он возвращает список скоростей для каждой полосы.

def optical_flow(image, prev_image, features_to_track, lanes, cars_in_lanes):
    if len(cars_in_lanes) == 0:
        return list([-1 for el in range(len(lanes))])
    feature_params = dict(maxCorners=100,
                          qualityLevel=0.3,
                          minDistance=20,
                          blockSize=7)
    lk_params = dict(winSize=(30, 30),
                     maxLevel=5,
                     criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

    prev_gray = cv2.convertScaleAbs(prev_image)
    prev_gray = cv2.cvtColor(prev_gray, cv2.COLOR_BGR2GRAY)
    p0 = np.array(features_to_track).reshape((-1, 1, 2))
    p0 = np.float32(p0)
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Compute optical flow
    p1, st, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, p0, None, **lk_params)

    # Filter good points
    good_new = p1[st == 1]
    good_prev = p0[st == 1]

    # Find the speed of each lane
    lane_speeds = list([0 for el in range(len(lanes))])
    for i, (new, prev) in enumerate(zip(good_new, good_prev)):
        x_new, y_new = new.ravel()
        x_prev, y_prev = prev.ravel()
        xs_new = []
        xs_old = []
        for lane in lanes:
            xs_new.append(get_x_at_y(lane, y_new))
            xs_old.append(get_x_at_y(lane, y_prev))
        dists_new = []
        dists_old = []
        if None in xs_new or None in xs_old:
            continue
        for x in xs_new:
            dists_new.append(x - x_new)
        for x in xs_old:
            dists_old.append(x - x_prev)
        if dists_new[0] > 0 or dists_new[-1] < 0 or dists_old[0] > 0 or dists_old[-1] < 0:
            continue
        lane_new = None
        lane_old = None
        for j in range(len(dists_new) - 1):
            if dists_new[j] < 0 < dists_new[j + 1]:
                lane_new = j
                break
        for j in range(len(dists_old) - 1):
            if dists_old[j] < 0 < dists_old[j + 1]:
                lane_old = j
                break
        lane_speeds[lane_new] += math.dist([x_new, y_new],[x_prev, y_prev])

    for i, ls in enumerate(lane_speeds):
        cnt = 0
        for car in cars_in_lanes:
            if car[1] == i:
                cnt += 1
        if cnt == 0:
            lane_speeds[i] = -1
            continue
        lane_speeds[i] = lane_speeds[i]/cnt

    return lane_speeds

Выход алгоритма оптического потока сохраняется и сравнивается с выходом предыдущего кадра. При наличии значительного изменения значений оптического потока для каждой дорожки оно фиксируется как 1 балл движения. Это реализовано в следующей функции.

def calc_movement(new_movement, old_movement, sum_movement):
    for i in range(len(new_movement)):
        if new_movement[i] > old_movement[i] + 1:
            sum_movement[i] += 1
    return sum_movement

Теперь, когда все реализовано, мы можем визуализировать результаты. Эта функция показывает изображение с ограничивающими прямоугольниками каждого автомобиля и полосами движения, а также печатает скорость полос в консоли.

def visualize(boxes, centers, class_ids, keep, image, colors, lanes, ccs_with_pos):
    for lane in lanes:
        cv2.polylines(image, [np.array(lane).reshape((-1, 1, 2))], False, (0,0,255), 2)
    for i, (box, c, ci) in enumerate(zip(boxes, centers, class_ids)):
        if i not in keep:
            continue
        class_list = ['car', 'motorbike', 'bus', 'truck', 'person']
        if classes[ci] in class_list:
            color = colors[class_list.index(classes[ci])]
            thickness = 2
            x, y, w, h = box
            cv2.rectangle(image, (x, y), (w, h), color, thickness)

    if ccs_with_pos is not None:
        prev_vehs = list([None for el in range(len(lanes))])
        for i in range(416, 0, -1):
            for car in ccs_with_pos:
                if car[0][1] == i and car[1] != -1:
                    if prev_vehs[car[1]] is None:
                        prev_vehs[car[1]] = car
                        cv2.putText(image, 'lane ' + str(car[1]+1), (car[0][0]-10, car[0][1]+10), cv2.FONT_HERSHEY_SIMPLEX,
                                    0.5, (255, 0, 0), 2, cv2.LINE_AA)
                    else:
                        cv2.line(image, car[0], prev_vehs[car[1]][0], (255,0,0), 2)
                        prev_vehs[car[1]] = car
    cv2.imshow("image", image)

После запуска всего в основном цикле while получаем следующее видео. Программа выводит, что на второй дорожке (дорожке 2) было больше всего движения и, следовательно, она была самой быстрой.

Организовывать все

Хотя алгоритм для обнаружения и отслеживания транспортных средств в одном пограничном переходе завершен, я хотел расширить решение, чтобы оно работало на нескольких пересечениях, сохраняя при этом результаты скорости. Для этого я решил запускать каждый скрипт пограничной камеры как отдельный контейнер, который взаимодействует внутри docker-compose. Поскольку запуск отдельного экземпляра модели YOLOv3 для каждой камеры был бы неэффективен, я создал отдельный контейнер с приложением flask, которое обслуживает другие контейнеры. Однако, чтобы несколько запросов не использовали одну и ту же нейронную сеть одновременно, важно использовать семафоры или другие механизмы блокировки.

Кроме того, я разработал веб-API .Net, который сохраняет данные о скорости и автомобилях на каждой полосе в базе данных. Включение полного кода здесь выходит за рамки этого поста, но вы можете найти его в моем репозитории GitHub. Благодаря этой реализации решение можно развернуть на нескольких пограничных переходах, а полученные данные можно эффективно собирать и анализировать, предоставляя ценную информацию о схемах движения и заторах в разных местах.

version: '3'
services:
  api:
    container_name: api
    build:
      context: ./api
      dockerfile: Dockerfile
    image: api:latest
    ports:
      - 80:80
    restart: always

  yolo:
    build: ./yolo_detector
    command: python script.py
    container_name: yolo
    ports:
      - 5001:5001
    restart: always

  script0:
    build: ./image_processing
    command: python count.py 0
    container_name: script0
    restart: always

  script1:
    build: ./image_processing
    command: python count.py 1
    container_name: script1
    restart: always

  ...

Ограничения

Хотя наш алгоритм доказал свою высокую эффективность, важно признать его ограничения. Во-первых, YOLOv3, модель обнаружения объектов, которую я использовал, менее точна в условиях низкой освещенности, например ночью. В результате количество обнаруженных транспортных средств будет значительно меньше, что может привести к неверным предположениям об объеме трафика. Кроме того, модель с трудом обнаруживает объекты на больших расстояниях, а это означает, что существует ограничение на количество транспортных средств, которые могут быть обнаружены в длинных очередях.

Кроме того, важно учитывать, что пограничные переходы могут существенно различаться по конструкции. Хотя большинство границ могут иметь схожую схему, некоторые из них имеют более сложные особенности, такие как изогнутые полосы движения и дополнительные барьеры. Эти изменения могут сделать алгоритм непригодным для использования в определенных сценариях. Стоит отметить, что дальнейшая разработка и оптимизация потенциально могут устранить эти ограничения, но на данный момент важно учитывать эти факторы при внедрении нашего решения.

Будущая работа

Хотя это решение обеспечивает эффективное средство определения скорости транспортных средств и загруженности дорог на пограничных контрольно-пропускных пунктах, его еще можно улучшить. Одной из областей будущей работы является изучение других моделей обнаружения объектов, чтобы увидеть, обеспечивают ли они повышенную точность в условиях низкой освещенности или при обнаружении объектов на больших расстояниях.

Кроме того, может быть полезно изучить возможность реализации этого алгоритма на пограничных переходах с более сложной планировкой. Это может включать разработку новых алгоритмов для решения проблем, связанных с кривыми полосами движения, дополнительными барьерами или другими особенностями, уникальными для этих переходов.

Кроме того, текущее решение может быть расширено за счет включения методов машинного обучения, которые могут адаптироваться к меняющимся моделям трафика с течением времени. Обучая модель на данных, собранных за длительный период, она потенциально может научиться настраивать свои параметры в зависимости от условий движения, со временем повышая свою точность и надежность.

Наконец, можно интегрировать данные из других источников, таких как социальные сети или камеры наблюдения за дорожным движением, чтобы получить более полное представление о схемах движения на пограничных переходах. Включив данные из ряда источников, можно будет найти корреляции между схемами движения и внешними факторами, такими как погода или общественные события, что даст ценную информацию для управления дорожным движением и планирования на будущее. Эти данные могут помочь людям лучше планировать свои поездки, устанавливая время отправления, когда ожидается наименьшее количество пробок.

Заключение

В заключение, используя алгоритмы машинного зрения, мы разработали эффективное решение для определения скорости транспортных средств и интенсивности движения на пограничных переходах. Вы можете поэкспериментировать с кодом на GitHub и попробовать, какие результаты вы получите с ближайшими к вам погранпереходами. Спасибо за чтение!