Выбрать главу

ConsumerInfo := PConsumerInfo(FConsumerInfo[aId]);

BufInfo := PBufferInfo(FBufferInfo[ConsumerInfo^.ciHead]);

NumToRead := InterLockedDecrement(BufInfo^.biToUseCount);

{переместить указатель начала очереди}

inc(ConsumerInfo^.ciHead);

if (ConsumerInfo^.ciHead >= FBufferCount) then

ConsumerInfo^.ciHead := 0;

{если данный поток был последним, который должен был использовать этот буфер, производителю нужно сигнализировать о необходимости генерирования новых данных}

if (NumToRead = 0) then

ReleaseSemaphore(FNeedsData, 1, nil);

end;

Конструктор и деструктор этого класса должны создавать и уничтожать большое количество объектов синхронизации, а также всю информацию о буфере и потребителе.

Листинг 12.18. Создание и уничтожение объекта синхронизации

constructor TtdProduceManyConsumeSync.Create(aBufferCount : integer;

aConsumerCount : integer);

var

NameZ : array [0..MAX_PATH] of AnsiChar;

i : integer;

BufInfo : PBufferInfo;

ConsumerInfo : PConsumerInfo;

begin

inherited Create;

{создать семафор "требуются данные"}

GetRandomObjName(NameZ, 'tdPMC.Needs Data');

FNeedsData := CreateSemaphore(nil, aBufferCount, aBufferCount, NameZ);

if (FNeedsData = INVALID_HANDLE_VALUE) then

RaiseLastWin32Error;

{создать циклическую очередь буферов и заполнить ее}

FBufferCount := aBufferCount;

FBufferInfo := TList.Create;

FBufferInfo.Count := aBufferCount;

for i := 0 to pred(aBufferCount) do

begin

New(BufInfo);

BufInfo^.biToUseCount :=0;

FBufferInfo[i] := BufInfo;

end;

{создать информационный список потребителей и заполнить его}

FConsumerCount := aConsumerCount;

FConsumerInfo := TList.Create;

FConsumerInfo.Count := aConsumerCount;

for i := 0 to pred(aConsumerCount) do

begin

New(ConsumerInfo);

FConsumerInfo[i] := ConsumerInfo;

GetRandomObjName(NameZ, 'tdPMC.HasData');

ConsumerInfo^.ciHasData :=

CreateSemaphore(nil, 0, aBufferCount, NameZ);

if (Consumer Info^.ciHasData = INVALID__HANDLE__VALUE) then

RaiseLastWin32Error;

ConsumerInfo^.ciHead := 0;

end;

end;

destructor TtdProduceManyConsumeSync.Destroy;

var

i : integer;

BufInfo : PBufferInfo;

ConsumerInfo : PConsumerInfo;

begin

{уничтожить семафор "требуются данные"}

if (FNeedsData <> INVALID_HANDLE_VALUE) then

CloseHandle(FNeedsData);

{уничтожить информационный список потребителей}

if (FConsumerInfo <> nil) then begin

for i := 0 to pred(FConsumerCount) do

begin

ConsumerInfo := PConsumerInfo(FConsumerInfo[i]);

if (ConsumerInfo <> nil) then begin

if (ConsumerInfo^.ciHasData <> INVALID__HANDLE__VALUE) then

CloseHandle(ConsumerInfo^.ciHasData);

Dispose(ConsumerInfo);

end;

end;

FConsumerInfo.Free;

end;

{уничтожить информационный список буферов}

if (FBufferInfo <> nil) then begin

for i := 0 to pred(FBufferCount) do

begin

BufInfo := PBufferInfo(FBufferInfo[i]);

if (BufInfo <> nil) then

Dispose(BufInfo);

end;

FBufferInfo.Free;

end;

inherited Destroy;

end;

Хотя, на первый взгляд, кажется, что в программе листинга 12.18 выполняется множество действий, в действительности все достаточно просто. Конструктор Create должен создать список буферов и заполнить его требуемым числом записей о буферах. Он должен также создать список потребителей и заполнить его соответствующим количеством записей о потребителях. Для каждой записи потребителя должен быть создан отдельный семафор. Деструктор Destroy должен уничтожить все эти объекты и освободить всю выделенную память.

Полный исходный код реализации класса TtdProduceManyConsumeSync можно найти на Web-сайте издательства, в разделе материалов. После выгрузки материалов отыщите среди них файл TDPCSync.pas.

В качестве примера программы мы рассмотрим подпрограмму многопоточного копирования, выполняющую копирование потока в три других потока. Как и в случае примера, приведенного в листинге 12.14, производитель будет считывать исходный поток в буфера, количество которых может доходить до 20. Потребители, количество которых теперь равняется трем, будут считывать буфера и выполнять запись в собственные потоки.

Класс TQueuedBuffers (листинг 12.19) должен быть несколько изменен, поскольку ему необходимо хранить указатель начала очереди для нескольких потребителей и, следовательно, он должен содержать массив таких указателей.

Листинг 12.19. Класс TQueuedBuffers для модели с несколькими потребителями type

PBuffer = ^TBuffer;

TBuffer = packed record

bCount : longint;

bBlock : array [0..pred(BufferSize) ] of byte;

end;

PBufferArray = ^TBufferArray;

TBufferArray = array [0..pred(MaxBuffers) ] of PBuffer;

TQueuedBuffers = class private

FBufCount : integer;

FBuffers : PBufferArray;

FConsumerCount : integer;

FHead : array [0..pred(MaxConsumers)] of integer;

FTail : integer;

protected

function qbGetHead(aInx : integer): PBuffer;

function qbGetTail : PBuffer;

public

constructor Create(aBufferCount : integer;

aConsumerCount : integer);

destructor Destroy; override;

procedureAdvanceHead(aConsumerId : integer);

procedure AdvanceTail;

property Head [aInx : integer] : PBuffer read qbGetHead;

property Tail : PBuffer read qbGetTail;

property ConsumerCount : integer read FConsumerCount;

end;

constructor TQueuedBuffers.Create(aBufferCount : integer;

aConsumerCount : integer);

var

i : integer;

begin

inherited Create;

{распределить буферы}

FBuffers := AllocMem(aBufferCount * sizeof(pointer));

for i := 0 to pred(aBufferCount) do

GetMem(FBuffers^[i], sizeof(TBuffer));

FBufCount := aBufferCount;

FConsumerCount := aConsumerCount;

end;

destructor TQueuedBuffers.Destroy;

var

i : integer;

begin

{освободить буферы}

if (FBuffers <> nil) then begin

for i := 0 to pred(FBufCount) do

if (FBuffers^[i] <> nil) then

FreeMem(FBuffers^[i], sizeof(TBuffer));

FreeMem(FBuffers, FBufCount * sizeof(pointer));

end;

inherited Destroy;