Как правильно выйти из std::thread, который может ожидать std::condition_variable?

У меня есть класс, который реализует многопоточную систему производителя/потребителя, используя мьютекс и две переменные условия для синхронизации. Производитель сигнализирует потоку-потребителю, когда есть элементы для использования, а потребитель сигнализирует потоку-производителю, когда он потребляет элементы. Потоки продолжают производить и потреблять до тех пор, пока деструктор не попросит их выйти, установив логическую переменную. Поскольку любой из потоков может ожидать переменную условия, я должен реализовать вторую проверку переменной quit, что кажется неправильным и беспорядочным...

Я сократил проблему до следующего примера (работа над GNU/Linux с g++4.7):

// C++11and Boost required.
#include <cstdlib> // std::rand()
#include <cassert>

#include <boost/circular_buffer.hpp>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

// Creates a single producer and single consumer thread.
class prosumer
{
    public:
        // Create the circular buffer and start the producer and consumer thread.
        prosumer()
            : quit_{ false }
            , buffer_{ circular_buffer_capacity }
            , producer_{ &prosumer::producer_func, this }
            , consumer_{ &prosumer::consumer_func, this }
        {}

        // Set the quit flag and wait for the threads to exit.
        ~prosumer()
        {
            quit_ = true;
            producer_.join();
            consumer_.join();
        }

    private:
        // Thread entry point for the producer.
        void producer_func()
        {
            // Value to add to the ringbuffer to simulate data.
            int counter = 0;

            while ( quit_ == false )
            {
                // Simulate the production of some data.
                std::vector< int > produced_items;
                const auto items_to_produce = std::rand() % circular_buffer_capacity;
                for ( int i = 0; i < items_to_produce; ++i )
                {
                    produced_items.push_back( ++counter );
                }

                // Get a lock on the circular buffer.
                std::unique_lock< std::mutex > lock( buffer_lock_ );

                // Wait for the buffer to be emptied or the quit flag to be set.
                buffer_is_empty_.wait( lock, [this]()
                        {
                            return buffer_.empty() == true || quit_ != false;
                        } );

                // Check if the thread was requested to quit.
                if ( quit_ != false )
                {
                    // Don't let the consumer deadlock.
                    buffer_has_data_.notify_one();
                    break;
                }

                // The buffer is locked by this thread. Put the data into it.
                buffer_.insert( std::end( buffer_ ), std::begin( produced_items ), std::end( produced_items ) );

                // Notify the consumer that the buffer has some data in it.
                buffer_has_data_.notify_one();
            }
            std::cout << "producer thread quit\n";
        }


        // Thread entry for the consumer.
        void consumer_func()
        {
            int counter_check = 0;

            while ( quit_ == false )
            {
                std::unique_lock< std::mutex > lock( buffer_lock_ );

                // Wait for the buffer to have some data before trying to read from it.
                buffer_has_data_.wait( lock, [this]()
                        {
                            return buffer_.empty() == false || quit_ != false;
                        } );

                // Check if the thread was requested to quit.
                if ( quit_ != false )
                {
                    // Don't let the producer deadlock.
                    buffer_is_empty_.notify_one();
                    break;
                }

                // The buffer is locked by this thread. Simulate consuming the data.
                for ( auto i : buffer_ ) assert( i == ++counter_check );
                buffer_.clear();

                // Notify the producer thread that the buffer is empty.
                buffer_is_empty_.notify_one();
            }
            std::cout << "consumer thread quit\n";
        }

        // How many items the circular buffer can hold. 
        static const int circular_buffer_capacity = 64;

        // Flag set in the destructor to signal the threads to stop.
        std::atomic_bool quit_;

        // Circular buffer to hold items and a mutex for synchronization.
        std::mutex buffer_lock_;
        boost::circular_buffer< int > buffer_;

        // Condition variables for the threads to signal each other.
        std::condition_variable buffer_has_data_;
        std::condition_variable buffer_is_empty_;

        std::thread producer_;
        std::thread consumer_;
};


int main( int argc, char **argv )
{
    (void)argc; (void) argv;

    prosumer test;

    // Let the prosumer work for a little while.
    std::this_thread::sleep_for( std::chrono::seconds( 3 ) );

    return EXIT_SUCCESS;
}

Если вы посмотрите на потоковые функции производителя_func и Consumer_func, вы увидите, что они зацикливаются до тех пор, пока деструктор prosumer не установит переменную quit, но они также снова проверяют переменную quit после блокировки циклического буфера. Если была установлена ​​переменная quit, они сигнализируют друг другу, чтобы предотвратить взаимоблокировку.

Еще у меня была идея вызвать notify_one() для условных переменных из деструктора, будет ли это лучшим решением?

Есть лучший способ сделать это?

Обновление 1: я забыл упомянуть, что в этом случае, когда потокам предлагается выйти, потребителю не нужно потреблять какие-либо оставшиеся данные в циклическом буфере, и это нормально, если производитель также производит немного больше. Пока они оба выходят и не заходят в тупик, все будет хорошо.


person x-x    schedule 04.01.2013    source источник
comment
Не имеет отношения к вашему вопросу, но вы не блокируете производителя buffer_ до тех пор, пока не заполните вектор из 2-й итерации?   -  person Jagannath    schedule 04.01.2013
comment
@Jagannath, извините, но не могли бы вы объяснить подробнее? Я не думаю, что понимаю ваш комментарий. Разве condition_variable::wait() не требует блокировки при разблокировке, чтобы циклический буфер был защищен, когда в него вставляется содержимое вектора?   -  person x-x    schedule 04.01.2013
comment
1-я итерация в производителе: вы блокируете буфер_, затем ждете переменную условия, затем вставляете данные в буфер_, а затем сигнализируете потребителю, вызывая notify_one(), а затем снова вставляете данные в вектор. Когда снимается блокировка на buffer_ ?   -  person Jagannath    schedule 04.01.2013
comment
Может быть, мне не хватает какой-то концепции здесь.   -  person Jagannath    schedule 04.01.2013
comment
@Jagannath, переменная std::unique_lock‹ std::mutex › lock(buffer_lock_) выходит из области видимости сразу после вызова buffer_has_data_.notify_one() и разблокирует мьютекс. Поправьте меня если я ошибаюсь!   -  person x-x    schedule 04.01.2013
comment
Извините моя ошибка. Проглядел это.   -  person Jagannath    schedule 04.01.2013
comment
Я бы проверил, как это реализовали другие, см. проблема производитель-потребитель. В любом случае, по моему мнению, переменная quit_ должна быть volatile.   -  person Ali    schedule 04.01.2013
comment
@Ali: Не должно. volatile не является ни необходимым, ни достаточным для многопоточного программирования. Это должно быть атомарно через std::atomic<>. К счастью для нас, этот конкретный вариант использования еще больше упрощается с помощью std::atomic_flag. Вы правы, видя риск: этот код имеет конфликт чтения-записи для этой переменной.   -  person GManNickG    schedule 04.01.2013
comment
@GManNickG Спасибо, я все еще изучаю C++11. А вообще, что не так с volatile bool? Какая-то книга (не помню какая) пропагандировала это для этих целей. Согласен, теперь, когда у нас есть стандартные решения, мы должны их придерживаться. Вы имеете в виду, что у нас есть хороший, потокобезопасный test_and_set для atomic_flag, который мы не можем сделать с volatile bool без мьютекса?   -  person Ali    schedule 04.01.2013
comment
@Ali: volatile просто не гарантирует атомарность или видимость. volatile — это ключевое слово, которое говорит, что операции чтения и записи в эту переменную являются наблюдаемым поведением, так же как и все они, что приводит к отключению оптимизации этой переменной. Чего он не делает, так это не говорит, как должны происходить эти операции чтения и записи. Это просто неправильная вещь, которая ушла навсегда.   -  person GManNickG    schedule 04.01.2013
comment
@GManNickG Пожалуйста, расскажите об наблюдаемом поведении или дайте несколько ссылок, где это объясняется (для чайников). Боюсь, я этого не понимаю.   -  person Ali    schedule 04.01.2013
comment
@Ali: это технический термин из стандарта. Если вы прочитаете это, вам следует будь хорошим. По сути, результатом работы программы является ее наблюдаемое поведение. Результат, который компилятор может оптимизировать x = 2 + 2; до x = 4;, заключается в том, что вы не можете наблюдать эффекты 2 + 2, поэтому, заменив его на 4 напрямую, как если бы он выполнил вычисление.   -  person GManNickG    schedule 04.01.2013
comment
@GManNickG Мне, вероятно, следует опубликовать вопрос с некоторым кодом, где вы можете объяснить, что может пойти не так. Я также должен откопать ту книгу, которая защищала volatile bool. Я уверен, что вы правы, но я хотел бы полностью понять вас. Пожалуйста, дайте мне немного времени. Я дам вам знать о вопросе. Спасибо!   -  person Ali    schedule 04.01.2013
comment
@GManNickG Хорошо, я вижу, что должен сам поработать над тем постом, на который вы ссылаетесь, он содержит интересную информацию. Спасибо! :)   -  person Ali    schedule 04.01.2013
comment
давайте продолжим это обсуждение в чате   -  person GManNickG    schedule 04.01.2013
comment
@GManNickG, почему флаг выхода должен быть std::atomic‹bool› или std::atomic_flag? Разве разница между false и !false не в одном бите? Конечно, изменение одного бита является атомарным на аппаратном уровне всех процессоров?   -  person x-x    schedule 05.01.2013
comment
@DrTwox: Нет. Кроме того, процессоры обычно не работают с отдельными битами, поэтому вам нужно получить эту гарантию для исходного размера слова. В любом случае, зачем гадать и надеяться, если можно просто использовать существующие утилиты? Если выяснится, что на вашем конкретном оборудовании не требуется дополнительной работы, они оптимизируют его. (Или, другими словами: если бы у нас были такие гарантии, комитет по стандартам не тратил бы годы на формализацию модели памяти вместе со всей библиотекой потоков и атомарной обработки.)   -  person GManNickG    schedule 05.01.2013
comment
@GManNickG, если логическая переменная ложна, то все биты равны 0. Если логическая переменная равна !false, то один или несколько битов равны 1. Это правильный способ взглянуть на логическое значение в C++? ... процессоры обычно не работают с отдельными битами ... Именно это я и хочу сказать! Если изменение одного бита в логическом значении изменяет if с false на !false, конечно, это должно быть атомарным? Что я здесь не понимаю или упускаю?   -  person x-x    schedule 05.01.2013
comment
@DrTwox: реализация bool совершенно не указана. sizeof(bool) может быть 1, 2, 4 или 128, если ваш компилятор так считает. Представление значения false может состоять из всех нулей, всех единиц или какой-то магической битовой комбинации, аналогично с true. Это просто совершенно не указано. Чего вам не хватает, так это того, что язык C++ не дает никаких гарантий атомарности или видимости без std::atomic. Так что, если у вас его нет, нет смысла удивляться. Конечно, вы можете продолжать исследовать его на конкретном компиляторе, на конкретной платформе и т. д., но это не C++. Это C++ на платформе X.   -  person GManNickG    schedule 05.01.2013
comment
@GManNickG, спасибо за разъяснение! Я был убежден, что в C++ в стандарте bool определяется как false = 0 и true = !false, что делает sizeof(bool) нерелевантным для моей точки зрения. Именно это предположение и вызвало недоразумение. Еще раз спасибо, что прояснили это!   -  person x-x    schedule 05.01.2013
comment
@DrTwox: Нет проблем. Когда логическое значение, равное false, преобразуется в целое число, результатом действительно является 0, но не указано, сохраняется логическое значение как целое число со значением 0. true действительно !false. '   -  person GManNickG    schedule 05.01.2013
comment
Если вы не возражаете против использования усиленных потоков и условных переменных, вы можете использовать thread::interrupt().   -  person Anonymous Coward    schedule 05.01.2013


Ответы (2)


На мой взгляд, вызов notify_one (или, скорее, notify_all, если вы хотите расширить свой буфер до нескольких производителей/потребителей) для обеих условных переменных в деструкторе перед вызовами соединения будет предпочтительным решением по нескольким причинам:

Во-первых, это соответствует тому, как обычно используются условные переменные: устанавливая quit_, вы изменяете состояние, в котором заинтересованы и ожидают потоки-производители/потребители, поэтому вы должны уведомить их об изменении состояния.

Кроме того, notify_one не должна быть очень дорогостоящей операцией.

Кроме того, в более реалистичном приложении между производством двух элементов может быть задержка; в этом случае вы можете не захотеть блокировать свой деструктор, пока потребитель не заметит, что он должен отменить, как только следующий элемент будет поставлен в очередь; в примере кода, насколько я вижу, этого не происходит.

person Phillip Keldenich    schedule 04.01.2013
comment
Я забыл упомянуть, что вам, вероятно, следует заблокировать свой мьютекс, прежде чем установить quit_ в значение true, или использовать какой-либо другой способ (например, атомный, но я не уверен, оставляет ли это возможное состояние гонки) для предотвращения UB - я полагаю, что компилятор может, с вашим код, можно оптимизировать проверки quit_, хотя я не слишком в этом уверен. - person Phillip Keldenich; 04.01.2013
comment
Спасибо за ответ. Я обновил вопрос, чтобы охватить последний абзац вашего ответа. В реальном приложении не имеет значения, остались ли элементы в кольцевом буфере при вызове деструктора. - person x-x; 05.01.2013

На мой взгляд, есть две функции, которые можно разделить:

  1. передача и отправка сообщений
  2. производить и потреблять

Имеет смысл действительно разделить их: «рабочий» поток не делает ничего, кроме обработки «сообщений», которые могут означать «выход» или «выполнение работы».

Таким образом, вы можете создать общий рабочий класс, который агрегирует реальную функцию. Методы produce и consume остаются чистыми, а класс worker заботится только о продолжении работы.

person xtofl    schedule 04.01.2013
comment
Можете ли вы уточнить? Предположительно, в какой-то момент методы производства и потребления все еще должны будут проверять какое-то состояние, чтобы определить, должны ли они продолжать или завершать работу? Класс prosumer, насколько я понимаю, уже обрабатывает передачу и диспетчеризацию сообщений с помощью переменной quit, которая является единственным сообщением, о котором должны знать потоки. - person x-x; 05.01.2013