Выбрать главу

 ... // Выполнить инициализации

 for (x1 = 0; x1 < num_x_lines; x1++) {

  pthread_create(NULL, NULL, do_one_line, (void*)x1);

 }

 ... // Вывести результат

}

С таким упрощением связано множество проблем. Первая из них (и самая незначительная) состоит в том, что функцию do_one_line() придется модифицировать так, чтобы она могла в качестве своего аргумента принимать значение типа void* вместо int. Это можно легко исправить с помощью оператора приведения типа (typecast).

Вторая проблема несколько сложнее. Скажем, что разрешающая способность дисплея, для которой вы рассчитывали картинку, была равна 1280×1024. Нам пришлось бы создать 1280 потоков! В общем-то, для QNX/Neutrino это не проблема — QNX/Neutrino позволяет создавать до 32767 потоков в одном процессе! Однако, каждый поток должен иметь свой уникальный стек. Если ваш стек имеет разумный размер (скажем 8 Кб), эта программа израсходует под стек 1280×8 Кб (10 мегабайт!) ОЗУ. И ради чего? В вашей системе есть только 4 процессора. Это означает, что только 4 из этих 1280 потоков будут работать одновременно, а другие 1276 потоков будут ожидать доступа к процессору. (В действительности, в данном случае пространство под стек будет выделяться только по мере необходимости. Но тем не менее, это все равно расходование ресурсов впустую — есть ведь еще и другие издержки.)

Более красивым способом решения этой задачи было бы разбить ее на 4 части (по одной подзадаче на каждый процессор), и обрабатывать каждую часть как отдельный поток:

int num_lines_per_cpu;

int num_cpus;

int main (int argc, char **argv) {

 int cpu;

 ... // Выполнить инициализации

 // Получить число процессоров

 num_cpus = _syspage_ptr->num_cpu;

 num_lines_per_cpu = num_x_lines / num_cpus;

 for (cpu = 0; cpu < num_cpus; cpu++) {

  pthread_create(NULL, NULL, do_one_batch, (void*)cpu);

 }

 ... // Вывести результат

}

void* do_one_batch(void *c) {

 int cpu = (int)c;

 int x1;

 for (x1 = 0; x1 < num_lines_per_cpu; x1++) {

  do_line_line(x1 + cpu * num_lines_per_cpu);

 }

}

Здесь мы запускаем только num_cpus потоков. Каждый поток будет выполняться на отдельном процессоре. А поскольку мы имеем дело с небольшим числом потоков, мы тем самым не засоряем память ненужными стеками. Обратите внимание, что мы получили число процессоров путем разыменования глобальной переменной — указателя на системную страницу _syspage_ptr. (Дополнительную информацию относительно системной страницы можно найти в книге «Building Embedded Systems» (поставляется в комплекте документации по QNX/ Neutrino — прим. ред.) или в заголовочном файле <sys/syspage.h>).

Программирование для одного или нескольких процессоров

Последняя программа в первую очередь интересна тем, что будет корректно функционировать в системе с одиночным процессором тоже. Просто будет создан только один поток, который и выполнит всю работу. Дополнительные издержки (один стек) с лихвой окупаются гибкостью программы, умеющей работать быстрее в многопроцессорной системе.

Синхронизация по отношению к моменту завершения потока

Я уже упоминал, что с приведенным выше упрощенным примером программы связана масса проблем. Так вот, еще одна связанная с ним проблема состоит в том, что функция main() сначала запускает целый букет потоков, а затем отображает результаты. Но как функция узнает, когда уже можно выводить результаты?

Заставлять main() заниматься опросом, закончены ли вычисления, противоречит самому замыслу ОС реального времени.

int main (int argc, char **argv) {

 ...

 // Запустить потоки, как раньше

 while (num_lines_completed < num_x_lines) {

  sleep(1);

 }

}

He вздумайте писать такие программы!

Для решения этой задачи существуют два изящных решения: применение функций pthread_join() и barrier_wait().

«Присоединение» (joining)

Самый простой метод синхронизации — это «присоединение» потоков. Реально это действие означает ожидание завершения.

Присоединение выполняется одним потоком, ждущим завершения другого потока. Ждущий поток вызывает pthread_join():

#include <pthread.h>

int pthread_join(pthread_t thread, void **value_ptr);

Функции pthread_join() передается идентификатор потока, к которому вы желаете присоединиться, а также необязательный аргумент value_ptr, который может быть использован для сохранения возвращаемого присоединяемым потоком значения (Вы можете передать вместо этого параметра NULL, если это значение для вас не представляет интереса — в данном примере мы так и сделаем).

Где нам брать идентификатор потока? Мы игнорировали его в функции pthread_create(), передав NULL в качестве первого параметра. Давайте исправим нашу программу:

int num_lines_per_cpu;

int num_cpus;

int main(int argc, char **argv) {

 int cpu;

 pthread_t *thread_ids;

 ... // Выполнить инициализации

 thread_ids = malloc(sizeof(pthread_t) * num_cpus);

 num_lines_per_cpu = num_x_lines / num_cpus;

 for (cpu = 0; cpu < num_cpus; cpu++) {

  pthread_create(

   &thread_ids[cpu], NULL, do_one_batch, (void*)cpu);

 }

 // Синхронизироваться с завершением всех потоков

 for (cpu = 0; cpu < num_cpus; cpu++) {

  pthread_join(thread_ids[cpu], NULL);

 }

 ... // Вывести результат

}

Обратите внимание, что на этот раз мы передали функции pthread_create() в качестве первого аргумента указатель на pthread_t. Там и будет сохранен идентификатор вновь созданного потока. После того как первый цикл for завершится, у нас будет num_cpu работающих потоков, плюс поток, выполняющий main(). Потребление ресурсов процессора потоком main() нас мало интересует — этот поток потратит все свое время на ожидание.

Ожидание достигается применением функции pthread_join() к каждому из наших потоков. Сначала мы ждем завершения потока thread_ids[0]. Когда он завершится, функция pthread_join() разблокируется. Следующая итерация цикла for заставит нас ждать завершения потока thread_ids[1], и так далее для всех num_cpus потоков.