Как работать с очередью заданий в kubernetes с масштабированием

Мне нужна масштабируемая обработка очереди на основе docker/python worker. Моя мысль ушла в сторону kubernetes. Однако я не уверен в лучшем контроллере/сервисе.

На основе функций Azure я получаю входящий http-трафик, добавляя простые сообщения в очередь хранилища. Над этими сообщениями нужно работать, а результаты отправлять обратно в очередь результатов.

Для обработки этих сообщений очереди я разработал код Python, зацикливающий очередь и работающий над этими заданиями. После каждого успешного цикла сообщение будет удалено из исходной очереди, а результат записан в очередь результатов. Когда очередь пуста, код существует.

Поэтому я создал образ докера, который запускает код Python. Если запущено более одного контейнера, очередь, очевидно, работает быстрее. Я также реализовал новые службы Azure Kubernetes, чтобы масштабировать это. Будучи новичком в kubernetes, я читал о парадигме работы с очередью, пока задача не будет готова. Мой простой шаблон yaml выглядит так:

apiVersion: batch/v1
kind: Job
metadata:
  name: myjob
spec:
  parallelism: 4
  template:
    metadata:
      name: myjob
    spec:
      containers:
      - name: c
        image: repo/image:tag

Моя проблема сейчас в том, что задание не может быть перезапущено.

Обычно очередь заполняется некоторыми записями, а затем какое-то время ничего не происходит. Затем снова могут появиться большие очереди, которые необходимо обработать как можно быстрее. Конечно, я хочу снова запустить задание, но это кажется невозможным. Кроме того, я хочу свести к минимуму занимаемую площадь, если в очереди ничего нет.

Итак, мой вопрос: какую архитектуру/конструкции следует использовать для этого сценария и есть ли для этого простые примеры yaml?


person Tom Seidel    schedule 04.01.2019    source источник


Ответы (2)


Это может быть «глупый / хакерский» ответ, но он простой, надежный, и я использую его в производственной системе уже несколько месяцев.

У меня есть аналогичная система, где у меня есть очередь, которая иногда опустошается, а иногда захлопывается. Я написал свой обработчик очереди аналогичным образом, он обрабатывает одно сообщение в очереди за раз и завершает работу, если очередь пуста. Он настроен для работы в задании Kubernetes.

Хитрость вот в чем: я создал CronJob для регулярного запуска одного-единственного нового экземпляра задания, и задание допускает бесконечный параллелизм. Если очередь пуста, она немедленно завершается ("уменьшается"). Если очередь заблокирована, а последнее задание еще не завершено, запускается другой экземпляр («масштабируется»).

Не нужно возиться с запросом очереди и масштабированием набора состояний или чем-то еще, и никакие ресурсы не потребляются, если очередь пуста. Возможно, вам придется настроить интервал CronJob, чтобы точно настроить скорость его реакции на заполнение очереди, но он должен реагировать довольно хорошо.

person Michael Pratt    schedule 04.01.2019
comment
Есть ли шанс, что вы могли бы поделиться примерами конфигураций? Как CronJob узнает, что последнее задание еще не завершено? Как указать более высокий уровень параллелизма заданий, если глубина очереди при этой проверке CronJob слишком велика? Концептуально это кажется проще, чем другая информация, которую я находил, но мне интересно, как она настроена. - person jdforsythe; 03.04.2019
comment
Это на самом деле довольно просто. Я вообще не ограничиваю параллелизм (просто не устанавливаю его в спецификации jobTemplate), а вместо этого устанавливаю concurrencyPolicy: "Allow" в спецификации cronjob. Затем установите любой график, по которому вы хотите запустить нового работника, я установил его на каждые 15 минут. Там нет опроса или чего-то подобного, и ему все равно, было ли завершено последнее задание. Он просто запускает новое задание каждые 15 минут, и все они завершаются, когда в очереди не остается элементов. - person Michael Pratt; 05.04.2019
comment
В вашем случае одна работа выполняет всю рабочую нагрузку? В моей ситуации я хотел бы иметь несколько заданий (желательно масштабировать их количество) и распределять нагрузку, выбирая задачи из очереди. Я нашел ваш ответ очень интересным, и мне было интересно, как вы этого добьетесь. Теперь у меня есть развертывание, которое масштабируется с длиной очереди, но вместо этого я хотел бы использовать Jobs и CronJobs (поскольку они эфемарны, но развертывания продолжают перезапускать мою работу, даже с кодом выхода 0) - person Anas Tiour; 26.06.2019
comment
Обычно одно задание обрабатывает очередь в течение часа и завершается, но если оно резервируется и не завершается к следующему расписанию CronJob, CronJob все равно создает другое задание, которое работает параллельно. Таким образом, мне не нужно возиться с масштабированием развертывания вверх и вниз. - person Michael Pratt; 10.08.2019

Это распространенный шаблон, и существует несколько способов разработки решения.

Обычное решение состоит в том, чтобы иметь приложение с набором воркеров, всегда опрашивающих вашу очередь (это может быть ваш скрипт на Python, но вам нужно сделать его службой), и, как правило, вы захотите использовать Kubernetes Развертывание возможно с Horizontal Pod Autoscaler на основе некоторых показателей вашей очереди или ЦП.

В вашем случае вы захотите сделать свой скрипт демоном и опросить очередь, если есть какие-либо элементы (я предполагаю, что вы уже обрабатываете условия гонки с помощью параллелизма). Затем разверните этот демон с помощью развертывания Kubernetes, а затем вы сможете увеличивать и уменьшать масштаб на основе метрик или расписания.

Уже есть планировщики заданий для многих разных языков. Очень популярен Airflow, который уже имеет возможность иметь "рабочих", но это может быть излишество для одного скрипта Python.

person Rico    schedule 04.01.2019
comment
Допустим, мы используем решение Deployment and HPA, где метрикой HPA является длина очереди. Как не допустить, чтобы сокращение масштабов убило активных работников? Например. мы увеличили количество рабочих мест до 10, 5 завершили работу, а HPA сокращает развертывание. Как вы убедитесь, что он убивает 5 рабочих, которые закончили работу, а не тех, которые все еще работают? - person Michal Tenenberg; 25.09.2019
comment
Обычно вы можете справиться с этим с помощью хука preStop, определенного в ваших контейнерах, и с льготным периодом завершения. Подробнее здесь: kubernetes.io/docs/concepts/workloads. /стручки/стручки/. Но да, в HPA нет механизма прекращения действия самой старой политики, аналогичного docs.aws.amazon.com/autoscaling/ec2/userguide/ (политика завершения ASG в AWS). Это может быть запрос функции :) - person Rico; 26.09.2019
comment
Есть запрос функции, но он открыт с 2017 года... github.com/kubernetes/kubernetes /вопросы/45509 - person Michal Tenenberg; 27.09.2019
comment
Спасибо, что поделились, я добавил комментарий к тикету. - person Rico; 27.09.2019