у меня следующий сценарий
- I am writing a server that process files (jobs)
- a file has a "prefix" and a time
- файлы должны обрабатываться по времени (сначала старые файлы), но также учитывать префикс (файлы с одинаковым префиксом не могут обрабатываться одновременно)
- У меня есть поток (задача с таймером), который наблюдает за каталогом и добавляет файлы в «очередь» (производитель)
- I have several consumers that take the file from "queue" (consumer) - they should conform to the above rules.
- the job of each task is kept in some list (this indicates the constraints)
- Потребителей несколько, количество потребителей определяется при запуске.
Одним из требований является возможность корректно останавливать потребителей (немедленно или позволить текущим процессам завершиться).
Я сделал что-то в этом направлении:
while (processing)
{
//limits number of concurrent tasks
_processingSemaphore.Wait(queueCancellationToken);
//Take next job when available or wait for cancel signal
currentwork = workQueue.Take(taskCancellationToken);
//check that it can actually process this work
if (CanProcess(currnetWork)
{
var task = CreateTask(currentwork)
task.ContinueWith((t) => { //release processing slot });
}
else
//release slot, return job? something else?
}
Источники токенов отмены находятся в коде вызывающей стороны и могут быть отменены. Их два, чтобы можно было остановить очередь, не отменяя запущенные задачи.
Я устал реализовывать «очередь» как BlockingCollection, обертывающую «безопасный» SortedSet. Общая идея работает (упорядочение по времени), за исключением случая, когда мне нужно найти новую работу, соответствующую ограничению. Если я верну задание в очередь и попытаюсь взять еще раз, то получу такое же.
Можно брать задания из очереди, пока я не найду подходящее, а затем вернуть «незаконные» задания обратно, но это может вызвать проблемы с другими потребителями, обрабатывающими неупорядоченные задания.
Другая альтернатива — передать простую коллекцию и способ ее блокировки, а затем просто заблокировать и выполнить простой поиск в соответствии с текущими ограничениями. Опять же, это означает написание кода, который, возможно, не будет потокобезопасным.
Любые другие предложения/указатели/структуры данных, которые могут помочь?