Выбрать главу

Ну и как же всё это соотносится с листингом 4.1? В следующем листинге показано, как перенести оттуда код в методы push() и wait_and_pop().

Листинг 4.4. Реализация функций push() и wait_and_pop() на основе кода из листинга 4.1

#include <queue>

#include <mutex>

#include <condition_variable>

template<typename T>

class threadsafe_queue {

private:

 std::mutex mut;

 std::queue<T> data_queue;

 std::condition_variable data_cond;

public:

 void push(T new_value) {

  std::lock_guard<std::mutex> lk(mut);

  data_queue.push(new_value);

  data_cond.notify_one();

 }

 void wait_and_pop(T& value) {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  value = data_queue.front();

  data_queue.pop();

 }

};

threadsafe_queue<data_chunk> data_queue; ← (1)

void data_preparation_thread() {

 while (more_data_to_prepare()) {

  data_chunk const data = prepare_data();

  data_queue.push(data); ← (2)

 }

}

void data_processing_thread() {

 while (true) {

  data_chunk data;

  data_queue.wait_and_pop(data); ← (3)

  process(data);

  if (is_last_chunk(data))

   break;

 }

}

Теперь мьютекс и условная переменная находятся в экземпляре threadsafe_queue, поэтому не нужно ни отдельных переменных (1), ни внешней синхронизации при обращении к функции push() (2). Кроме того, wait_and_pop() берет на себя заботу об ожидании условной переменной (3).

Второй перегруженный вариант wait_and_pop() тривиален, а остальные функции можно почти без изменений скопировать из кода стека в листинге 3.5. Ниже приведена окончательная реализация.

Листинг 4.5. Полное определение класса потокобезопасной очереди на базе условных переменных

#include <queue>

#include <memory>

#include <mutex>

#include <condition_variable>

template<typename T>

class threadsafe_queue {

private:

 mutable std::mutex mut;← (1) Мьютекс должен быть изменяемым

 std::queue<T> data_queue;

 std::condition_variable data_cond;

public:

 threadsafe_queue() {}

 threadsafe_queue(threadsafe_queue const& other) {

  std::lock_guard<std::mutex> lk(other.mut);

  data_queue = other.data_queue;

 }

 void push(T new_value) {

  std::lock_guard<std::mutex> lk(mut);

  data_queue.push(new_value);

  data_cond.notify_one();

 }

 void wait_and_pop(T& value) {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{ return !data_queue.empty(); });

  value = data_queue.front();

  data_queue.pop();

 }

 std::shared_ptr<T> wait_and_pop() {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{ return !data_queue.empty(); });

  std::shared_ptr<T>

   res(std::make_shared<T>(data_queue.front()));

  data_queue.pop();

  return res;

 }

 bool try_pop(T& value) {

  std::lock_guard<std::mutex> lk(mut);