Azure: как переместить сообщения из подозрительной очереди обратно в основную очередь?

Мне интересно, есть ли инструмент или библиотека, которая может перемещать сообщения между очередями? В настоящее время я делаю что-то вроде ниже

public static void ProcessQueueMessage([QueueTrigger("myqueue-poison")] string message, TextWriter log)
{
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connString);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference("myqueue");
    queue.CreateIfNotExists();

    var messageData = JsonConvert.SerializeObject(data, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() });
    queue.AddMessage(new CloudQueueMessage(messageData));
}

person vbn    schedule 21.10.2015    source источник


Ответы (9)


По состоянию на (2018-09-11) версия 1.4.1 Microsoft Azure Storage Explorer не может перемещать сообщения из одной очереди Azure в другую.

Я записал простое решение для передачи подозрительных сообщений обратно в исходную очередь и подумал, что это может сэкономить кому-то несколько минут. Очевидно, вам нужно будет исправить ошибку, из-за которой сообщения попадали в очередь подозрительных сообщений!

Вам потребуется добавить ссылку на пакет NuGet в Microsoft.NET.Sdk.Functions. :

using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;

void Main()
{
    const string queuename = "MyQueueName";

    string storageAccountString = "xxxxxx";

    RetryPoisonMesssages(storageAccountString, queuename);
}

private static int RetryPoisonMesssages(string storageAccountString, string queuename)
{
    CloudQueue targetqueue = GetCloudQueueRef(storageAccountString, queuename);
    CloudQueue poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");

    int count = 0;
    while (true)
    {
        var msg = poisonqueue.GetMessage();
        if (msg == null)
            break;

        poisonqueue.DeleteMessage(msg);
        targetqueue.AddMessage(msg);
        count++;
    }

    return count;
}

private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
{
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountString);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference(queuename);

    return queue;
}
person Mitch Wheat    schedule 11.09.2018
comment
Большое спасибо. Если вы запускаете это из консоли, достаточно добавить WindowsAzure.Storage. Однако вам нужно будет сделать вызовы GetMessage, DeleteMessage и AddMessage асинхронными, поскольку эти неасинхронные методы были удалены из библиотеки. - person Knelis; 10.10.2018

По сути, служба хранилища Azure не поддерживает перемещение сообщений из одной очереди в другую. Вам нужно будет сделать это самостоятельно.

Одним из способов реализации перемещения сообщений из одной очереди в другую является удаление сообщений из исходной очереди (путем вызова GetMessages), чтение содержимого сообщения и последующее создание нового сообщения в целевой очереди. Это можно сделать с помощью клиентской библиотеки хранилища.

Мне приходит на ум один из инструментов для перемещения сообщений: Cerebrata Azure Management Studio (платный продукт с 15-дневным бесплатная пробная версия). Он имеет этот функционал.

По состоянию на 11 сентября 2018 г., версия 1.4.1 Microsoft Azure Обозреватель службы хранилища не поддерживает перемещение сообщений очереди.

person Gaurav Mantri    schedule 21.10.2015
comment
Azure Storage Explorer на момент написания не поддерживает это. - person Sentinel; 12.06.2017
comment
@MitchWheat ... Короче говоря ... Я продал Cerebrata компании RedGate в 2011 году и приобрел ее в апреле 2016 года. Когда я опубликовал этот ответ в 2015 году, я не был связан с Cerebrata, и поэтому я не добавил раскрытие: ). Надеюсь, это ответит на ваше беспокойство. - person Gaurav Mantri; 11.09.2018

Вот обновленная версия ответа Митча с использованием последнего пакета Microsoft.Azure.Storage.Queue. Просто создайте новое консольное приложение .NET, добавьте в него вышеупомянутый пакет и замените содержимое Program.cs следующим:

using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using System.Threading.Tasks;

namespace PoisonMessageDequeuer
{
    class Program
    {
        static async Task Main(string[] args)
        {
            const string queuename = "MyQueueName";

            string storageAccountString = "xxx";

            await RetryPoisonMesssages(storageAccountString, queuename);
        }

        private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename)
        {
            var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
            var poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");

            var count = 0;
            while (true)
            {
                var msg = await poisonqueue.GetMessageAsync();
                if (msg == null)
                    break;

                await poisonqueue.DeleteMessageAsync(msg);
                await targetqueue.AddMessageAsync(msg);
                
                count++;
            }

            return count;
        }

        private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
        {
            var storageAccount = CloudStorageAccount.Parse(storageAccountString);
            var queueClient = storageAccount.CreateCloudQueueClient();
            var queue = queueClient.GetQueueReference(queuename);

            return queue;
        }
    }
}

Тем не менее, это все еще довольно медленно, если вы работаете с 1000 сообщений, поэтому я бы рекомендовал изучить пакетные API для больших объемов.

person IGx89    schedule 09.09.2019
comment
Я бы рекомендовал изменить порядок DeleteMessageAsync и AddMessageAsync, чтобы, если что-то пойдет не так, вы получили дубликат сообщения вместо потерянного сообщения? - person Andreas Öhlund; 29.10.2019
comment
Хорошее предложение! В моем случае сообщения очереди вызывают некритические электронные письма, поэтому я бы предпочел, чтобы сообщение было удалено, а не дублировано. - person IGx89; 30.10.2019
comment
Да, варианты использования могут различаться, но я бы сказал, что безопасным по умолчанию является получение дубликата, поскольку в облаке вам в значительной степени нужно сделать всю свою обработку идемпотентной, поскольку лучшая гарантия, которую вы можете получить, — это «по крайней мере один раз». Возможно, комментарий в коде, чтобы люди хотя бы знали о решении? - person Andreas Öhlund; 30.10.2019
comment
Я надеюсь, что люди не слепо копировали и запускали мой код, это просто пример того, как добиться этого с помощью новейших API, но, вероятно, все же разумно это сделать, я полагаю. Готово! - person IGx89; 31.10.2019
comment
Просто примечание. Этот код не работает в последней версии учетной записи хранения. Причина в том, что AddMessageAsync перезапишет некоторую информацию в сообщении, а затем DeleteMessagAsync выдаст ошибку 404. Лучшее решение — скопировать значения в новое сообщение для AddMessageAsync. - person Mikael Eliasson; 13.12.2019
comment
@MikaelEliasson, я опубликовал обновленную функцию на основе вашего предложения Azure: как переместить сообщения из очереди отравления обратно в основную очередь? - person Michael Freidgeim; 18.01.2020
comment
Мне пришлось снова запустить этот код, и я сам столкнулся с проблемой. Я вернул код к своему исходному ответу, так как он работает (и отлично работает для моего варианта использования). Если люди хотят, чтобы он был более надежным, они могут использовать код из ответа Майкла. - person IGx89; 30.07.2020

Azure Storage Explorer версии 1.15.0 теперь может делать это с 2020 года. /Microsoft/AzureStorageExplorer/issues/1064

person Tanya Branagan    schedule 23.02.2021

Вот скрипт Python, который может оказаться полезным. Вам нужно установить azure-storage-queue

queueService = QueueService(connection_string = "YOUR CONNECTION STRING")
for queue in queueService.list_queues():
  if "poison" in queue.name:
    print(queue.name)
    targetQueueName = queue.name.replace("-poison", "")
    while queueService.peek_messages(queue.name):
      for message in queueService.get_messages(queue.name, 32):
        print(".", end="", flush=True)
        queueService.put_message(targetQueueName, message.content)
        queueService.delete_message(queue.name, message.id, message.pop_receipt)
person Jon Canning    schedule 27.02.2019
comment
Я получаю эту ошибку: Исключение: [AttributeError] (объект QueueServiceClient не имеет атрибута peek_messages) - person Andrew; 26.06.2020

Мне просто нужно было сделать это снова, и я нашел время, чтобы обновить свои фрагменты до новых SDK для хранения. См. сообщение по адресу https://www.bokio.se/engineering-blog/how-to-re-run-the-poison-queue-in-azure-webjobs/ для получения дополнительной информации.

Вот код, который я использовал

using Azure.Storage.Queues;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace AzureQueueTransfer
{
    internal class Program
    {
        // Need Read, Update & Process (full url, can create in storage explorer)
        private const string sourceQueueSAS = ""; 

        // Need Add (full url, can create in storage explorer)
        private const string targetQueueSAS = "";
        private static async Task Main(string[] args)
        {
            var sourceQueue = new QueueClient(new Uri(sourceQueueSAS));
            var targetQueue = new QueueClient(new Uri(targetQueueSAS));

            var queuedAny = true;
            while (queuedAny)
            {
                Thread.Sleep(30000); // Sleep to make sure we dont build too much backlog so we can process new messages on higher prio than old ones
                queuedAny = false;
                foreach (var message in sourceQueue.ReceiveMessages(maxMessages: 32).Value)
                {
                    queuedAny = true;
                    var res = await targetQueue.SendMessageAsync(message.Body);

                    Console.WriteLine($"Transfered: {message.MessageId}");
                    await sourceQueue.DeleteMessageAsync(message.MessageId, message.PopReceipt);
                }

                Console.WriteLine($"Finished batch");
            } 
        }
    }
}

person Mikael Eliasson    schedule 26.01.2021

Всем, кто приходит сюда в поисках эквивалента Node ответа @MitchWheats, используя функцию Azure.

import AzureStorage from 'azure-storage'
import { Context, HttpRequest } from '@azure/functions'
import util from 'util'

const queueService = AzureStorage.createQueueService()
queueService.messageEncoder = new AzureStorage.QueueMessageEncoder.TextBase64QueueMessageEncoder()

const deleteMessage = util.promisify(queueService.deleteMessage).bind(queueService)
const createMessage = util.promisify(queueService.createMessage).bind(queueService)
const getMessage = util.promisify(queueService.getMessage).bind(queueService)

export async function run (context: Context, req: HttpRequest): Promise<void> {
  try {
    const poisonQueue = (req.query.queue || (req.body && req.body.queue));
    const targetQueue = poisonQueue.split('-')[0]

    let count = 0

    while (true) {
      const message = await getMessage(poisonQueue)
      if (!message) { break; }
      if (message.messageText && message.messageId && message.popReceipt) {
        await createMessage(targetQueue, message.messageText)
        await deleteMessage(poisonQueue, message.messageId, message.popReceipt)
      }
      count++
    }

    context.res = {
      body: `Replayed ${count} messages from ${poisonQueue} on ${targetQueue}`
    };
  } catch (e) {
    context.res = { status: 500 }
  }
}

Чтобы использовать функцию, вам необходимо предоставить информацию о подключении для учетной записи хранения, используемой для ваших очередей хранилища. Это предоставляется как переменные среды. Либо вы предоставляете AZURE_STORAGE_ACCOUNT и AZURE_STORAGE_ACCESS_KEY, либо AZURE_STORAGE_CONNECTION_STRING. Подробнее об этом можно узнать в документах SDK Azure Storage.

Также написал несколько строк об этом в этом Статья среднего уровня

person Christofer Eliasson    schedule 20.06.2019

Обновленный python на основе ответа Джона Каннинга:

from azure.storage.queue import QueueServiceClient


queueService = QueueServiceClient.from_connection_string(conn_str="DefaultEndpointsProtocol=https;AccountName=<account>;AccountKey=<key>;EndpointSuffix=core.windows.net")

for queue in queueService.list_queues():
  if "poison" in queue.name:
    print(queue.name)
    targetQueueName = queue.name.replace("-poison", "")
    queue = queueService.get_queue_client(queue=queue.name)
    targetQueue = queueService.get_queue_client(queue=targetQueueName)
    while queue.peek_messages() :
        messages = queue.receive_messages()
        for msg in messages:
            targetQueue.send_message(msg.content)
            queue.delete_message(msg)            
person Ran Mansoor    schedule 01.11.2020

Как Микаэль Элиассон отметил, что код в ответе IGx89 не работает, потому что

AddMessageAsync перезапишет некоторую информацию в сообщении, а затем DeleteMessagAsync выдаст ошибку 404. Лучшее решение — скопировать значения в новое сообщение для AddMessageAsync.

Пожалуйста, смотрите расширенную версию RetryPoisonMesssages с возможностью указывать только список сообщений (а не все в очереди) и позволять копировать сообщения, а не перемещать их. Он также регистрирует успех/неудача для каждого сообщения.

/// <param name="storageAccountString"></param>
/// <param name="queuename"></param>
/// <param name="idsToMove">If not null, only messages with listed IDs will be moved/copied</param>
/// <param name="deleteFromPoisonQueue">if false,  messages will be copied; if true, they will be moved
///Warning: if queue is big, keeping deleteFromPoisonQueue=false can cause the same row 
///from poisonqueue to be copied more than once(the reason is not found yet)</param>
/// <returns></returns>
private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename, string[] idsToMove=null, bool deleteFromPoisonQueue=false)
{
    var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
    var poisonQueueName = queuename + "-poison";
    var poisonqueue = GetCloudQueueRef(storageAccountString, poisonQueueName);

    var count = 0;
    while (true)
    {
        var msg = await poisonqueue.GetMessageAsync();
        if (msg == null)
        {
            Console.WriteLine("No more messages in a queue " + poisonQueueName);
            break;
        }

        string action = "";
        try
        {
            if (idsToMove == null || idsToMove.Contains(msg.Id))
            {
                var msgToAdd = msg;
                if (deleteFromPoisonQueue)
                {
                    //The reason is that AddMessageAsync will overwrite some info on the message and then DeleteMessagAsync will give a 404.
                    //The better solution is to copy the values into a new message for AddMessageAsync 
                     msgToAdd = new CloudQueueMessage(msg.AsBytes);
                }

                action = "adding";
                await targetqueue.AddMessageAsync(msgToAdd);
                Console.WriteLine(action + " message ID " + msg.Id);
                if (deleteFromPoisonQueue)
                {
                    action = "deleting";
                    await poisonqueue.DeleteMessageAsync(msg);
                }
                Console.WriteLine(action + " message ID " + msg.Id);
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("Error encountered when "+ action + " " + ex.Message + " at message ID " + msg.Id);
        }

        count++;
    }

    return count;
}
person Michael Freidgeim    schedule 18.01.2020