У меня есть класс, который реализует многопоточную систему производителя/потребителя, используя мьютекс и две переменные условия для синхронизации. Производитель сигнализирует потоку-потребителю, когда есть элементы для использования, а потребитель сигнализирует потоку-производителю, когда он потребляет элементы. Потоки продолжают производить и потреблять до тех пор, пока деструктор не попросит их выйти, установив логическую переменную. Поскольку любой из потоков может ожидать переменную условия, я должен реализовать вторую проверку переменной quit, что кажется неправильным и беспорядочным...
Я сократил проблему до следующего примера (работа над GNU/Linux с g++4.7):
// C++11and Boost required.
#include <cstdlib> // std::rand()
#include <cassert>
#include <boost/circular_buffer.hpp>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
// Creates a single producer and single consumer thread.
class prosumer
{
public:
// Create the circular buffer and start the producer and consumer thread.
prosumer()
: quit_{ false }
, buffer_{ circular_buffer_capacity }
, producer_{ &prosumer::producer_func, this }
, consumer_{ &prosumer::consumer_func, this }
{}
// Set the quit flag and wait for the threads to exit.
~prosumer()
{
quit_ = true;
producer_.join();
consumer_.join();
}
private:
// Thread entry point for the producer.
void producer_func()
{
// Value to add to the ringbuffer to simulate data.
int counter = 0;
while ( quit_ == false )
{
// Simulate the production of some data.
std::vector< int > produced_items;
const auto items_to_produce = std::rand() % circular_buffer_capacity;
for ( int i = 0; i < items_to_produce; ++i )
{
produced_items.push_back( ++counter );
}
// Get a lock on the circular buffer.
std::unique_lock< std::mutex > lock( buffer_lock_ );
// Wait for the buffer to be emptied or the quit flag to be set.
buffer_is_empty_.wait( lock, [this]()
{
return buffer_.empty() == true || quit_ != false;
} );
// Check if the thread was requested to quit.
if ( quit_ != false )
{
// Don't let the consumer deadlock.
buffer_has_data_.notify_one();
break;
}
// The buffer is locked by this thread. Put the data into it.
buffer_.insert( std::end( buffer_ ), std::begin( produced_items ), std::end( produced_items ) );
// Notify the consumer that the buffer has some data in it.
buffer_has_data_.notify_one();
}
std::cout << "producer thread quit\n";
}
// Thread entry for the consumer.
void consumer_func()
{
int counter_check = 0;
while ( quit_ == false )
{
std::unique_lock< std::mutex > lock( buffer_lock_ );
// Wait for the buffer to have some data before trying to read from it.
buffer_has_data_.wait( lock, [this]()
{
return buffer_.empty() == false || quit_ != false;
} );
// Check if the thread was requested to quit.
if ( quit_ != false )
{
// Don't let the producer deadlock.
buffer_is_empty_.notify_one();
break;
}
// The buffer is locked by this thread. Simulate consuming the data.
for ( auto i : buffer_ ) assert( i == ++counter_check );
buffer_.clear();
// Notify the producer thread that the buffer is empty.
buffer_is_empty_.notify_one();
}
std::cout << "consumer thread quit\n";
}
// How many items the circular buffer can hold.
static const int circular_buffer_capacity = 64;
// Flag set in the destructor to signal the threads to stop.
std::atomic_bool quit_;
// Circular buffer to hold items and a mutex for synchronization.
std::mutex buffer_lock_;
boost::circular_buffer< int > buffer_;
// Condition variables for the threads to signal each other.
std::condition_variable buffer_has_data_;
std::condition_variable buffer_is_empty_;
std::thread producer_;
std::thread consumer_;
};
int main( int argc, char **argv )
{
(void)argc; (void) argv;
prosumer test;
// Let the prosumer work for a little while.
std::this_thread::sleep_for( std::chrono::seconds( 3 ) );
return EXIT_SUCCESS;
}
Если вы посмотрите на потоковые функции производителя_func и Consumer_func, вы увидите, что они зацикливаются до тех пор, пока деструктор prosumer не установит переменную quit, но они также снова проверяют переменную quit после блокировки циклического буфера. Если была установлена переменная quit, они сигнализируют друг другу, чтобы предотвратить взаимоблокировку.
Еще у меня была идея вызвать notify_one() для условных переменных из деструктора, будет ли это лучшим решением?
Есть лучший способ сделать это?
Обновление 1: я забыл упомянуть, что в этом случае, когда потокам предлагается выйти, потребителю не нужно потреблять какие-либо оставшиеся данные в циклическом буфере, и это нормально, если производитель также производит немного больше. Пока они оба выходят и не заходят в тупик, все будет хорошо.
quit_
должна бытьvolatile
. - person Ali   schedule 04.01.2013volatile
не является ни необходимым, ни достаточным для многопоточного программирования. Это должно быть атомарно черезstd::atomic<>
. К счастью для нас, этот конкретный вариант использования еще больше упрощается с помощьюstd::atomic_flag
. Вы правы, видя риск: этот код имеет конфликт чтения-записи для этой переменной. - person GManNickG   schedule 04.01.2013volatile bool
? Какая-то книга (не помню какая) пропагандировала это для этих целей. Согласен, теперь, когда у нас есть стандартные решения, мы должны их придерживаться. Вы имеете в виду, что у нас есть хороший, потокобезопасныйtest_and_set
дляatomic_flag
, который мы не можем сделать сvolatile bool
без мьютекса? - person Ali   schedule 04.01.2013volatile
просто не гарантирует атомарность или видимость.volatile
— это ключевое слово, которое говорит, что операции чтения и записи в эту переменную являются наблюдаемым поведением, так же как и все они, что приводит к отключению оптимизации этой переменной. Чего он не делает, так это не говорит, как должны происходить эти операции чтения и записи. Это просто неправильная вещь, которая ушла навсегда. - person GManNickG   schedule 04.01.2013x = 2 + 2;
доx = 4;
, заключается в том, что вы не можете наблюдать эффекты2 + 2
, поэтому, заменив его на4
напрямую, как если бы он выполнил вычисление. - person GManNickG   schedule 04.01.2013volatile bool
. Я уверен, что вы правы, но я хотел бы полностью понять вас. Пожалуйста, дайте мне немного времени. Я дам вам знать о вопросе. Спасибо! - person Ali   schedule 04.01.2013bool
совершенно не указана.sizeof(bool)
может быть 1, 2, 4 или 128, если ваш компилятор так считает. Представление значенияfalse
может состоять из всех нулей, всех единиц или какой-то магической битовой комбинации, аналогично сtrue
. Это просто совершенно не указано. Чего вам не хватает, так это того, что язык C++ не дает никаких гарантий атомарности или видимости безstd::atomic
. Так что, если у вас его нет, нет смысла удивляться. Конечно, вы можете продолжать исследовать его на конкретном компиляторе, на конкретной платформе и т. д., но это не C++. Это C++ на платформе X. - person GManNickG   schedule 05.01.2013false
, преобразуется в целое число, результатом действительно является0
, но не указано, сохраняется логическое значение как целое число со значением 0.true
действительно!false
. ' - person GManNickG   schedule 05.01.2013thread::interrupt()
. - person Anonymous Coward   schedule 05.01.2013