Выбрать главу

Другой ключевой параметр, предназначенный для управления потоками, — это параметр flags, передаваемый функции thread_pool_create(). Он может принимать одно из следующих значений:

POOL_FLAG_EXIT_SELF

Не делать возврат из функции thread_pool_start() и не включать вызывающий поток в пул.

POOL_FLAG_USE_SELF

Не делать возврат из функции thread_pool_start(), но включить вызывающий поток в пул.

0

Функция thread_pool_start() возвратится, новые потоки будут создаваться по мере необходимости.

Приведенное описание может показаться суховатым. Давайте рассмотрим пример.

В управляющей структуре пула потоков сконцентрируем наше внимание только на значениях параметров lo_water, increment и maximum:

/*

 * tp1.с

 *

 * Пример с пулами потоков (1)

 *

*/

#include <stdio.h>

#include <stdlib.h>

#include <errno.h>

#include <sys/neutrino.h>

#include <sys/dispatch.h>

char *progname = "tp1";

void tag (char *name) {

 time_t t;

 char buffer[BUFSIZ];

 time(&t);

 strftime(buffer, BUFSIZ, "%T ", localtime(&t));

 printf("%s %3d %-20.20s: ", buffer, pthread_self(), name);

}

THREAD_POOL_PARAM_T* blockfunc(

 THREAD_POOL_PARAM_T *ctp) {

 tag("blockfunc");

 printf("ctp %p\n", ctp);

 tag("blockfunc");

 printf("sleep (%d);\n", 15 * pthread_self());

 sleep(pthread_self() * 15);

 tag("blockfunc");

 printf("Выполнили sleep\n");

 tag("blockfunc");

 printf("Возвращаем 0x%08X\n",

  0x10000000 + pthread_self());

 return((void*)(0x10000000 + pthread_self()));

 // Передано handlerfunc

}

THREAD_POOL_PARAM_T* contextalloc(

 THREAD_POOL_HANDLE_T *handle) {

 tag("contextalloc");

 printf("handle %p\n", handle);

 tag("contextalloc");

 printf("Возвращаем 0x%08X\n",

  0x20000000 + pthread_self());

 return ((void*)(0x20000000 + pthread_self()));

 // Передано blockfunc

}

void contextfree(THREAD_POOL_PARAM_T *param) {

 tag("contextfree");

 printf("param %p\n", param);

}

void unblockfunc(THREAD_POOL_PARAM_T *ctp) {

 tag("unblockfunc");

 printf("ctp %p\n", ctp);

}

int handlerfunc(THREAD_POOL_PARAM_T *ctp) {

 static int i = 0;

 tag("handlerfunc");

 printf("ctp %p\n", ctp);

 if (i++ > 15) {

  tag("handlerfunc");

  printf("Более 15 операций, возвращаем 0\n");

  return (0);

 }

 tag("handlerfunc");

 printf("sleep (%d)\n", pthread_self() * 25);

 sleep(pthread_self() * 25);

 tag("handlerfunc");

 printf("Выполнили sleep\n");

 tag("handlerfunc");

 printf("Возвращаем 0x%08X\n",

  0x30000000 + pthread_self());

 return (0x30000000 + pthread_self());

}

main() {

 thread_pool_attr_t tp_attr;

 void *tpp;

 memset(&tp_attr, 0, sizeof(tp_attr));

 tp_attr.handle = (void*)0x12345678;

  // Передано contextalloc

 tp_attr.block_func = blockfunc;

 tp_attr.unblock_func = unblockfunc;

 tp_attr.context_alloc = contextalloc;

 tp_attr.context_free = contextfree;

 tp_attr.handler_func = handlerfunc;

 tp_attr.lo_water = 3;

 tp_attr.hi_water = 7;

 tp_attr.increment = 2;

 tp_attr.maximum = 10;

 if ((tpp =

  thread_pool_create(&tp_attr, POOL_FLAG_USE_SELF)) ==

   NULL) {

  fprintf(stderr,

   "%s: Ошибка thread_pool_create, errno %s\n",

   progname, strerror(errno));

  exit(EXIT_FAILURE);

 }

 thread_pool_start(tpp);

 fprintf(stderr,

  "%s: возврат из thread_pool_start; errno %s\n",

  progname, strerror(errno));

 sleep(3000);

 exit(EXIT_FAILURE);

}

После установки параметров мы вызываем функцию thread_pool_create() для создания пула потоков. Эта функция возвращает указатель на управляющую структуру пула потоков (tpp), который мы проверяем на равенство NULL (что указало бы на ошибку). И, наконец, мы вызываем функцию thread_pool_start(), передав ей эту самую управляющую структуру tpp.

Я указал флаг POOL_FLAG_USE_SELF, что означает, что поток, вызвавший функцию thread_pool_start(), будет рассматриваться как доступный для ввода в пул. Таким образом, на момент старта пула в нем есть только один поток. Поскольку значение параметра lo_water равно 3, библиотека немедленно создаст еще increment потоков (в нашем случае — 2). С этого момента в пуле будет три (3) потока, и все они будут находиться в режиме блокирования. Условие по параметру lo_water удовлетворено, потому что число потоков в режиме блокирования действительно не меньше lo_water, условие по параметру hi_water удовлетворено, потому что число потоков в режиме блокирования действительно не больше hi_water; и, наконец, также удовлетворено условие по параметру maximum, потому что общее число потоков не превышает его значения. Допустим теперь, что один из потоков, находящихся в режиме блокирования, разблокируется (например, в серверном приложении — при получении сообщения). Это означает, что один из трех потоков перейдет из режима блокирования в режим обработки. Счетчик блокированных потоков уменьшится, и его значение упадет ниже значения параметра lo_water. Это переключит триггер lo_water и заставит библиотеку создать ещё increment (2) потоков. Таким образом, у нас будет всего 5 потоков (4 в режиме блокирования, и 1 — в режиме обработки).