template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length) ←
(1)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length+min_per_thread - 1) / min_per_thread; ←
(2)
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads = ←
(3)
std::min(
hardware.threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads; ←
(4)
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1); ←
(5)
Iterator block_start = first;
for(unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size); ←
(6)
threads[i] = std::thread( ←
(7)
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results(i)));
block_start = block_end; ←
(8)
}
accumulate_block()(
block_start, last, results[num_threads-1]); ←
(9)
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join)); ←
(10)
return
std::accumulate(results.begin(), results.end(), init); ←
(11)
}
Хотя функция довольно длинная, по существу она очень проста. Если входной диапазон пуст (1), то мы сразу возвращаем начальное значение init
. В противном случае диапазон содержит хотя бы один элемент, поэтому мы можем разделить количество элементов на минимальный размер блока и получить максимальное число потоков (2).
Это позволит избежать создания 32 потоков на 32-ядерной машине, если диапазон состоит всего из пяти элементов.
Число запускаемых потоков равно минимуму из только что вычисленного максимума и количества аппаратных потоков (3): мы не хотим запускать больше потоков, чем может поддержать оборудование (это называется превышением лимита), так как из-за контекстных переключений при большем количестве потоков производительность снизится. Если функция std::thread::hardware_concurrency()
вернула 0, то мы берем произвольно выбранное число, я решил остановиться на 2. Мы не хотим запускать слишком много потоков, потому что на одноядерной машине это только замедлило бы программу. Но и слишком мало потоков тоже плохо, так как это означало бы отказ от возможного параллелизма.
Каждый поток будет обрабатывать количество элементов, равное длине диапазона, поделенной на число потоков (4). Пусть вас не пугает случай, когда одно число нацело не делится на другое, — ниже мы рассмотрим его.
Теперь, зная, сколько необходимо потоков, мы можем создать вектор std::vector<T>
для хранения промежуточных результатов и вектор std::vector<std::thread>
для хранения потоков (5). Отметим, что запускать нужно на один поток меньше, чем num_threads
, потому что один поток у нас уже есть.
Запуск потоков производится в обычном цикле: мы сдвигаем итератор block_end
в конец текущего блока (6) и запускаем новый поток для аккумулирования результатов по этому блоку (7). Начало нового блока совпадает с концом текущего (8).
После того как все потоки запущены, главный поток может обработать последний блок (9). Именно здесь обрабатывается случай деления с остатком: мы знаем, что конец последнего блока — last
, а сколько в нем элементов, не имеет значения.
Аккумулировав результаты но последнему блоку, мы можем дождаться завершения всех запущенных потоков с помощью алгоритма std::for_each
(10), а затем сложить частичные результаты, обратившись к std::accumulate
(11).
Прежде чем расстаться с этим примером, полезно отметить, что в случае, когда оператор сложения, определенный в типе T
, не ассоциативен (например, если T
— это float
или double
), результаты, возвращаемые алгоритмами parallel_accumulate
и std::accumulate
, могут различаться из-за разбиения диапазона на блоки. Кроме того, к итераторам предъявляются более жесткие требования: они должны быть по меньшей мере однонаправленными, тогда как алгоритм std::accumulate
может работать и с однопроходными итераторами ввода. Наконец, тип T
должен допускать конструирование по умолчанию (удовлетворять требованиям концепции DefaultConstructible), чтобы можно было создать вектор results
. Такого рода изменения требований довольно типичны для параллельных алгоритмов: но самой своей природе они отличаются от последовательных алгоритмов, и это приводит к определенным последствиям в части как результатов, так и требований. Более подробно параллельные алгоритмы рассматриваются в главе 8. Стоит также отметить, что из-за невозможности вернуть значение непосредственно из потока, мы должны передавать ссылку на соответствующий элемент вектора results
. Другой способ возврата значений из потоков, с помощью будущих результатов, рассматривается в главе 4.