Эффективные задачи сигнализации для завершения TPL при часто повторяющихся событиях

Я работаю над системой моделирования, которая, среди прочего, позволяет выполнять задачи в дискретных симулированных временных шагах. Все выполнение происходит в контексте потока моделирования, но с точки зрения «оператора», использующего систему, они хотят вести себя асинхронно. К счастью, TPL с удобными ключевыми словами «async/await» делает это довольно просто. У меня есть примитивный метод моделирования, например:

    public Task CycleExecutedEvent()
    {
        lock (_cycleExecutedBroker)
        {
            if (!IsRunning) throw new TaskCanceledException("Simulation has been stopped");
            return _cycleExecutedBroker.RegisterForCompletion(CycleExecutedEventName);
        }
    }

Это в основном создание нового TaskCompletionSource, а затем возврат Task. Целью этой задачи является выполнение ее продолжения, когда возникает новый «ExecuteCycle» в моделировании.

Затем у меня есть некоторые методы расширения, подобные этому:

    public static async Task WaitForDuration(this ISimulation simulation, double duration)
    {
        double startTime = simulation.CurrentSimulatedTime;
        do
        {
            await simulation.CycleExecutedEvent();
        } while ((simulation.CurrentSimulatedTime - startTime) < duration);
    }

    public static async Task WaitForCondition(this ISimulation simulation, Func<bool> condition)
    {
        do
        {
            await simulation.CycleExecutedEvent();
        } while (!condition());
    }

Это очень удобно для построения последовательностей с точки зрения «оператора», выполнения действий в зависимости от условий и ожидания периодов симулированного времени. Проблема, с которой я сталкиваюсь, заключается в том, что CycleExecuted возникает очень часто (примерно каждые несколько миллисекунд, если я работаю на полной скорости). Поскольку эти вспомогательные методы «ожидания» регистрируют новое «ожидание» в каждом цикле, это приводит к большому обороту экземпляров TaskCompletionSource.

Я профилировал свой код и обнаружил, что примерно 5,5% моего общего времени ЦП тратится на эти завершения, из которых лишь незначительный процент тратится на «активный» код. По сути, все время тратится на регистрацию новых завершений в ожидании выполнения условий срабатывания.

Мой вопрос: как я могу улучшить производительность, сохраняя при этом удобство шаблона async/await для написания «поведения оператора»? Я думаю, мне нужно что-то вроде более легкого и/или многоразового TaskCompletionSource, учитывая, что инициирующее событие происходит так часто.


Я провел немного больше исследований, и мне кажется, что хорошим вариантом было бы создать пользовательскую реализацию шаблона Awaitable, которая могла бы напрямую привязываться к событию, устраняя необходимость в связке экземпляров TaskCompletionSource и Task. Причина, по которой это может быть полезно здесь, заключается в том, что существует множество различных продолжений, ожидающих CycleExecutedEvent, и они должны ожидать его часто. Поэтому в идеале я ищу способ просто поставить в очередь обратные вызовы продолжения, а затем вызывать все в очереди всякий раз, когда происходит событие. Я буду продолжать копать, но я буду рад любой помощи, если люди знают чистый способ сделать это.


Для тех, кто просматривает этот вопрос в будущем, вот пользовательский ожидающий, который я собрал:

public sealed class CycleExecutedAwaiter : INotifyCompletion
{
    private readonly List<Action> _continuations = new List<Action>();

    public bool IsCompleted
    {
        get { return false; }
    }

    public void GetResult()
    {
    }

    public void OnCompleted(Action continuation)
    {
        _continuations.Add(continuation);
    }

    public void RunContinuations()
    {
        var continuations = _continuations.ToArray();
        _continuations.Clear();
        foreach (var continuation in continuations)
            continuation();
    }

    public CycleExecutedAwaiter GetAwaiter()
    {
        return this;
    }
}

И в Симуляторе:

    private readonly CycleExecutedAwaiter _cycleExecutedAwaiter = new CycleExecutedAwaiter();

    public CycleExecutedAwaiter CycleExecutedEvent()
    {
        if (!IsRunning) throw new TaskCanceledException("Simulation has been stopped");
        return _cycleExecutedAwaiter;
    }

Это немного забавно, так как ожидающий никогда не сообщает о завершении, но пожары продолжают вызывать завершения по мере их регистрации; тем не менее, это хорошо работает для этого приложения. Это снижает нагрузку на ЦП с 5,5% до 2,1%. Скорее всего, он все еще потребует некоторой настройки, но это хорошее улучшение по сравнению с оригиналом.


person Dan Bryant    schedule 27.06.2012    source источник
comment
Эй, нечестно отвечать на свой вопрос за минуту до меня! :-)   -  person svick    schedule 27.06.2012
comment
@svick, на него еще не дан полный ответ; Мне все еще нужно выяснить, как создать пользовательский ожидающий. :) Спасибо за ссылки; они весьма полезны.   -  person Dan Bryant    schedule 27.06.2012
comment
@DanBryant: вы должны ответить на свой вопрос ... как ответ, а не в теле своего вопроса.   -  person user7116    schedule 28.06.2012
comment
По общему признанию, просто пропускаю вопрос, но с учетом того, что событие происходит несколько раз, и компонент, основанный на времени, я думаю, что Rx 2.0 подойдет здесь. Задача хорошо работает для одноразового использования, но наблюдаемая лучше работает (ИМХО) для «потоков» событий, как у вас здесь. Сообщение в блоге команды Rx здесь очень полезно для понимания того, когда оно подходит: blogs.msdn.com/b/rxteam/archive/2012/03/12/   -  person James Manning    schedule 28.06.2012
comment
На самом деле возврат всегда false для IsCompleted - это то, что TaskEx.Yield также делает для получения контроля. Я не совсем уверен в вашем другом коде, но использование однопоточного TaskScheduler и Yield похоже на то, что делает ваш код...   -  person sanosdole    schedule 06.02.2014
comment
@sanosdole, я думаю, ты прав; пользовательская реализация TaskScheduler, вероятно, также будет работать здесь.   -  person Dan Bryant    schedule 06.02.2014


Ответы (3)


Ключевое слово await работает не только с Task, оно работает со всем, что соответствует ожидаемому шаблону. Подробности см. в статье Стивена Туба wait any; .

Краткая версия заключается в том, что тип должен иметь метод GetAwaiter(), который возвращает тип, реализующий INotifyCompletion, а также имеет IsCompleted свойство и GetResult() метод (void-возврат, если выражение await не должно иметь значения). Пример см. в разделе TaskAwaiter.

Если вы создадите свой собственный ожидаемый объект, вы можете каждый раз возвращать один и тот же объект, избегая накладных расходов на выделение множества TaskCompletionSources.

person svick    schedule 27.06.2012
comment
Я построил обычай ожидания, как вы предложили; это намного быстрее, хотя я уверен, что еще есть возможности для улучшения. Главное, что меня беспокоит, это то, что он довольно «дырявый» (внешние классы могут захватить экземпляр Awaiter и вызвать RunContinuations, чтобы запустить его вне ожидаемого контекста). - person Dan Bryant; 28.06.2012
comment
@DanBryant Я не уверен, что ты можешь что-то с этим поделать. Если вы дадите кому-то способ запланировать продолжение (а это то, что вам нужно), он всегда сможет использовать его вне ожидаемого контекста. - person svick; 28.06.2012

Вот моя версия ReusableAwaiter, имитирующая TaskCompletionSource

public sealed class ReusableAwaiter<T> : INotifyCompletion
{
    private Action _continuation = null;
    private T _result = default(T);
    private Exception _exception = null;

    public bool IsCompleted
    {
        get;
        private set;
    }

    public T GetResult()
    {
        if (_exception != null)
            throw _exception;
        return _result;
    }

    public void OnCompleted(Action continuation)
    {
        if (_continuation != null)
            throw new InvalidOperationException("This ReusableAwaiter instance has already been listened");
        _continuation = continuation;
    }

    /// <summary>
    /// Attempts to transition the completion state.
    /// </summary>
    /// <param name="result"></param>
    /// <returns></returns>
    public bool TrySetResult(T result)
    {
        if (!this.IsCompleted)
        {
            this.IsCompleted = true;
            this._result = result;

            if (_continuation != null)
                _continuation();
            return true;
        }
        return false;
    }

    /// <summary>
    /// Attempts to transition the exception state.
    /// </summary>
    /// <param name="result"></param>
    /// <returns></returns>
    public bool TrySetException(Exception exception)
    {
        if (!this.IsCompleted)
        {
            this.IsCompleted = true;
            this._exception = exception;

            if (_continuation != null)
                _continuation();
            return true;
        }
        return false;
    }

    /// <summary>
    /// Reset the awaiter to initial status
    /// </summary>
    /// <returns></returns>
    public ReusableAwaiter<T> Reset()
    {
        this._result = default(T);
        this._continuation = null;
        this._exception = null;
        this.IsCompleted = false;
        return this;
    }

    public ReusableAwaiter<T> GetAwaiter()
    {
        return this;
    }
}

А вот и тестовый код.

class Program
{
    static readonly ReusableAwaiter<int> _awaiter = new ReusableAwaiter<int>();

    static void Main(string[] args)
    {
        Task.Run(() => Test());

        Console.ReadLine();
        _awaiter.TrySetResult(22);
        Console.ReadLine();
        _awaiter.TrySetException(new Exception("ERR"));

        Console.ReadLine();
    }

    static async void Test()
    {

        int a = await AsyncMethod();
        Console.WriteLine(a);
        try
        {
            await AsyncMethod();
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

    }

    static  ReusableAwaiter<int> AsyncMethod()
    {
        return _awaiter.Reset();
    }

}
person Mr.Wang from Next Door    schedule 19.11.2016

Вам действительно нужно получать событие WaitForDuration в другом потоке? Если нет, вы можете просто зарегистрировать обратный вызов (или событие) с помощью _cycleExecutedBroker и синхронно получать уведомление. В обратном вызове вы можете проверить любое условие, которое вам нравится, и только если это условие окажется истинным, уведомить другой поток (используя задачу, сообщение или любой другой механизм). Я понимаю, что условие, которое вы проверяете, редко оценивается как истинное, поэтому таким образом вы избегаете большинства вызовов между потоками.

Я предполагаю, что суть моего ответа такова: попытайтесь уменьшить объем обмена сообщениями между потоками, переместив вычисления в «исходный» поток.

person usr    schedule 27.06.2012
comment
На самом деле это не пересечение границ потоков; поскольку TaskCompletionSource помечен как выполненный в потоке моделирования, все завершения выполняются в потоке моделирования. Весь смысл использования async/await в этом случае заключается в том, чтобы все задачи выполнялись в одном и том же потоке с контролируемой «уступкой» для проверки только один раз за симулированный цикл выполнения. - person Dan Bryant; 27.06.2012
comment
Хорошо, у задач все еще есть накладные расходы, которых нет у простых событий. В течение своего жизненного цикла им требуется несколько взаимосвязанных операций, которые являются аппаратными блокировками и очень дороги. Они являются системным глобальным ресурсом. - person usr; 28.06.2012