... // Выполнить инициализации
for (x1 = 0; x1 < num_x_lines; x1++) {
pthread_create(NULL, NULL, do_one_line, (void*)x1);
}
... // Вывести результат
}
С таким упрощением связано множество проблем. Первая из них (и самая незначительная) состоит в том, что функцию do_one_line() придется модифицировать так, чтобы она могла в качестве своего аргумента принимать значение типа void*
вместо int
. Это можно легко исправить с помощью оператора приведения типа (typecast).
Вторая проблема несколько сложнее. Скажем, что разрешающая способность дисплея, для которой вы рассчитывали картинку, была равна 1280×1024. Нам пришлось бы создать 1280 потоков! В общем-то, для QNX/Neutrino это не проблема — QNX/Neutrino позволяет создавать до 32767 потоков в одном процессе! Однако, каждый поток должен иметь свой уникальный стек. Если ваш стек имеет разумный размер (скажем 8 Кб), эта программа израсходует под стек 1280×8 Кб (10 мегабайт!) ОЗУ. И ради чего? В вашей системе есть только 4 процессора. Это означает, что только 4 из этих 1280 потоков будут работать одновременно, а другие 1276 потоков будут ожидать доступа к процессору. (В действительности, в данном случае пространство под стек будет выделяться только по мере необходимости. Но тем не менее, это все равно расходование ресурсов впустую — есть ведь еще и другие издержки.)
Более красивым способом решения этой задачи было бы разбить ее на 4 части (по одной подзадаче на каждый процессор), и обрабатывать каждую часть как отдельный поток:
int num_lines_per_cpu;
int num_cpus;
int main (int argc, char **argv) {
int cpu;
... // Выполнить инициализации
// Получить число процессоров
num_cpus = _syspage_ptr->num_cpu;
num_lines_per_cpu = num_x_lines / num_cpus;
for (cpu = 0; cpu < num_cpus; cpu++) {
pthread_create(NULL, NULL, do_one_batch, (void*)cpu);
}
... // Вывести результат
}
void* do_one_batch(void *c) {
int cpu = (int)c;
int x1;
for (x1 = 0; x1 < num_lines_per_cpu; x1++) {
do_line_line(x1 + cpu * num_lines_per_cpu);
}
}
Здесь мы запускаем только num_cpus потоков. Каждый поток будет выполняться на отдельном процессоре. А поскольку мы имеем дело с небольшим числом потоков, мы тем самым не засоряем память ненужными стеками. Обратите внимание, что мы получили число процессоров путем разыменования глобальной переменной — указателя на системную страницу _syspage_ptr. (Дополнительную информацию относительно системной страницы можно найти в книге «Building Embedded Systems» (поставляется в комплекте документации по QNX/ Neutrino — прим. ред.) или в заголовочном файле <sys/syspage.h>
).
Последняя программа в первую очередь интересна тем, что будет корректно функционировать в системе с одиночным процессором тоже. Просто будет создан только один поток, который и выполнит всю работу. Дополнительные издержки (один стек) с лихвой окупаются гибкостью программы, умеющей работать быстрее в многопроцессорной системе.
Я уже упоминал, что с приведенным выше упрощенным примером программы связана масса проблем. Так вот, еще одна связанная с ним проблема состоит в том, что функция main() сначала запускает целый букет потоков, а затем отображает результаты. Но как функция узнает, когда уже можно выводить результаты?
Заставлять main() заниматься опросом, закончены ли вычисления, противоречит самому замыслу ОС реального времени.
int main (int argc, char **argv) {
...
// Запустить потоки, как раньше
while (num_lines_completed < num_x_lines) {
sleep(1);
}
}
He вздумайте писать такие программы!
Для решения этой задачи существуют два изящных решения: применение функций pthread_join() и barrier_wait().
Самый простой метод синхронизации — это «присоединение» потоков. Реально это действие означает ожидание завершения.
Присоединение выполняется одним потоком, ждущим завершения другого потока. Ждущий поток вызывает pthread_join():
#include <pthread.h>
int pthread_join(pthread_t thread, void **value_ptr);
Функции pthread_join() передается идентификатор потока, к которому вы желаете присоединиться, а также необязательный аргумент value_ptr, который может быть использован для сохранения возвращаемого присоединяемым потоком значения (Вы можете передать вместо этого параметра NULL, если это значение для вас не представляет интереса — в данном примере мы так и сделаем).
Где нам брать идентификатор потока? Мы игнорировали его в функции pthread_create(), передав NULL в качестве первого параметра. Давайте исправим нашу программу:
int num_lines_per_cpu;
int num_cpus;
int main(int argc, char **argv) {
int cpu;
pthread_t *thread_ids;
... // Выполнить инициализации
thread_ids = malloc(sizeof(pthread_t) * num_cpus);
num_lines_per_cpu = num_x_lines / num_cpus;
for (cpu = 0; cpu < num_cpus; cpu++) {
pthread_create(
&thread_ids[cpu], NULL, do_one_batch, (void*)cpu);
}
// Синхронизироваться с завершением всех потоков
for (cpu = 0; cpu < num_cpus; cpu++) {
pthread_join(thread_ids[cpu], NULL);
}
... // Вывести результат
}
Обратите внимание, что на этот раз мы передали функции pthread_create() в качестве первого аргумента указатель на pthread_t
. Там и будет сохранен идентификатор вновь созданного потока. После того как первый цикл for
завершится, у нас будет num_cpu работающих потоков, плюс поток, выполняющий main(). Потребление ресурсов процессора потоком main() нас мало интересует — этот поток потратит все свое время на ожидание.
Ожидание достигается применением функции pthread_join() к каждому из наших потоков. Сначала мы ждем завершения потока thread_ids[0]
. Когда он завершится, функция pthread_join() разблокируется. Следующая итерация цикла for
заставит нас ждать завершения потока thread_ids[1]
, и так далее для всех num_cpus потоков.