Создание простого потребителя сообщений с помощью gocron

(Это последняя часть из 3-х частей. Предыдущую часть можно прочитать здесь)

Краткое содержание предыдущей главы

В предыдущей главе мы создали нашу функцию производителя сообщений enqueue. Функция enqueue вызывается, когда наш сервер приложений получает трафик и публикует сообщение в RabbitMQ. В зависимости от вашей конфигурации вы можете вызвать вызов функции enqueue() в другом месте.

Создание потребителя

Далее мы создадим задание cron, которое будет запускаться каждый час/минуту (в зависимости от ваших предпочтений) и удалять сообщения из нашей очереди. Для простоты, когда сообщение будет использовано, мы отправим базовое электронное письмо с уведомлением, используя пакет Mailgun go.

(Вы можете найти репо простого потребителя здесь)

В этом примере рабочий будет работать в фоновом режиме и каждые 2 минуты будет выполнять processMessages функцию. Мы поговорим о настройке этого воркера для работы в качестве systemdservice позже.

Подобно предыдущему примеру, где мы создаем Connection и Channel, чтобы иметь возможность выполнять задачи в нашей очереди.

amqpServerURL := os.Getenv("AMQP_SERVER_URL")
	connectRabbitMQ, err := amqp.Dial(amqpServerURL)
	if err != nil {
		panic(err)
	}
	defer connectRabbitMQ.Close()
	channelRabbitMQ, err := connectRabbitMQ.Channel()
	if err != nil {
		panic(err)
	}
	defer channelRabbitMQ.Close()

Мы будем использовать функцию Consume() на нашем канале для получения сообщений. Вы можете увидеть комментарии к вариантам. Для простоты мы будем автоматически подтверждать сообщения по мере их использования.

messages, err := channelRabbitMQ.Consume(
		"FallbackAPIQueue",
		"",
		true,                      // auto acknowledge
		false,                     // exclusive
		false,                     // no local
		false,                     // no wait
		nil,
	)
	if err != nil {
		log.Println(err)
	}
	log.Println("You've connected to RabbitMQ")
	log.Println("Waiting for messages")

	if len(messages) == 0 {
		log.Println("You don't have any messages in the queue")
	}
	for message := range messages {
		if string(message.Body) == "A request has been sent via fallback API" {
			sendSimpleMessage()
		}
	}

Наконец, для каждого сообщения, которое мы потребляем, мы будем вызывать sendSimpleMessage()

Используя клиентскую библиотеку Go Mailgun mailgun-go, мы можем отправлять простые электронные письма. Для этого вам потребуется учетная запись в Mailgun. Вам понадобится ваш домен Mailgun и ключ API, сохраненные в переменных среды. В разделе ресурсов ниже есть ссылка, если у вас нет учетной записи Mailgun.

func sendSimpleMessage() {
	api_key := os.Getenv("MAILGUN_API_KEY")
	sandbox_domain := os.Getenv("MAILGUN_DOMAIN")

	mg := mailgun.NewMailgun(sandbox_domain, api_key)
	sender := "[email protected]"
	subject := "security warning"
	body := "Your fallback api is exposed"
	recipient := "<YOUR_EMAIL>"
	message := mg.NewMessage(sender, subject, body, recipient)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	defer cancel()
	resp, id, err := mg.Send(ctx, message)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("ID: %s Resp: %s\\n", id, resp)
}

Теперь давайте настроим systemd для запуска этого приложения как службы.

systemd — это система инициализации и системный менеджер, входящая в состав большинства дистрибутивов Linux. Он поставляется с инструментом управления systemctl

Если вы хотите узнать больше о systemd или если вы новичок в этом, вы можете найти ссылку на учебник ниже в ресурсах. Вернемся к созданию нашего сервиса.

Сначала мы создадим сервисный файл:

$ sudo vim /lib/systemd/system/simple_consumer.service

Далее мы добавим следующие параметры конфигурации в наш simple_consumer.service:

Прежде чем мы двинемся дальше, давайте поговорим о некоторых из этих шагов настройки:

  1. В строке 3 требуется ConditionPathExists, чтобы убедиться, что каталог нашего приложения присутствует на нашем сервере, прежде чем он запустится.
  2. After в строке 6 указывает systemd подождать, пока не будет запущена служба postgresql. Это необходимо, потому что я интегрирую это в свой простой сервер API. Это может не потребоваться в зависимости от вашей конфигурации.
  3. В строке 12 мы указываем WorkingDirectory. Это важно для получения переменных среды из нашего файла .env.
  4. Наконец, ExecStart указывает расположение нашего исполняемого файла.

Прежде чем мы включим нашу систему, нам нужно дать соответствующие разрешения для запуска нашего приложения:

$ sudo chmod +x /opt/appDir/simple_consumer

Теперь с помощью systemctl мы можем сначала включить службу и запустить ее. Мы можем проверить статус приложения с помощью последней команды:

$ sudo systemctl enable simple_consumer      # Enable the service
$ sudo systemctl start simple_consumer       # Start the service
$ sudo systemctl status simple_consumer      # Check service status

Теперь давайте запустим enqueue пару раз и посмотрим нашу очередь до и после запуска cron-job:

$ sudo rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
FallbackAPIQueue 3

Число рядом с FallbackAPIQueue должно уменьшиться до 0, и в нашем почтовом ящике у нас должно быть электронное письмо для каждого сообщения, которое исключено из нашей очереди 🎊

Ресурсы