Выбрать главу

begin

{освободить буферы}

if (FBuffers <> nil) then begin

for i := 0 to pred( FBuf Count) do

if (FBuffers^[i] <> nil) then

FreeMem(FBuffers^[i], sizeof(TBuffer));

FreeMem(FBuffers, FBufCount * sizeof(pointer));

end;

inherited Destroy;

end;

procedure TQueuedBuffers.AdvanceHead;

begin

inc(FHead);

if (FHead = FBufCount) then

FHead := 0;

end;

procedure TQueuedBuffers.AdvanceTail;

begin

inc(FTail);

if (FTail = FBuf Count) then

FTail := 0;

end;

function TQueuedBuffers.qbGetHead : PBuffer;

begin

Result := FBuffers^[FHead];

end;

function TQueuedBuffers.qbGetTail : PBuffer;

begin

Result := FBuffers^[FTail];

end;

Менее очевидно то, что указатели начала и конца очереди не должны быть защищены от изменений критическими разделами или какими-то аналогичными элементами. На первый взгляд это кажется противоречащим здравому смыслу и всем правилам совместного использования данных в различных потоках. Однако поток потребителя никогда не будет обращаться к указателю конца очереди. О наличии данных, которые нужно считать из указателя начала очереди, ему будет сообщать поток производителя (в этот момент времени указатели начала и конца очереди будут различными). Аналогично, поток производителя никогда не будет обращаться к указателю начала очереди, поскольку о наличии места для добавления данных в конце очереди ему будет сообщать поток потребителя.

Коды реализации классов производителя и потребителя приведены в листинге 12.13. Эти классы являются производными от класса TThread. Код реализации каждого из перекрытых методов Execute не отличается от ранее описанного. Поток производителя входит в цикл. На каждом шаге цикла он вызывает метод StartProducer объекта синхронизации, а затем считывает блок данных из исходного потока в буфер в конце очереди. После этого он смещает указатель конца очереди. И, в заключение, он вызывает метод StopProducing и повторяет цикл с начала. Выполнение цикла прекращается, как только поток производителя устанавливает буфер в состояние, соответствующее отсутствию в нем каких-либо данных (потребитель воспринимает это состояние в качестве признака "конец потока").

В свою очередь, цикл потока потребителя выполняется следующим образом. Вначале поток вызывает метод StartConsuming объекта синхронизации. Возврат из этого метода свидетельствует об отсутствии данных для считывания в объекте поставленных в очередь буферов. Поток считывает данные из буфера, определяемого указателем начала очереди, и записывает их в поток назначения. Затем он смещает указатель начала очереди. Сразу после считывания всех данных из заполненного буфера он вызывает метод StopConsuming объекта синхронизации и повторяет цикл сначала. Работа потребителя останавливается при получении им пустого буфера.

Листинг 12.13. Классы производителя и потребителя

type

TProducer = class (TThread) private

FBuffers : TQueuedBuffers;

FStream : TStream;

FSyncObj : TtdProduceConsumeSync;

protected

procedure Execute; override;

public

constructor Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

end;

constructor TProducer.Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

begin

inherited Create (true);

FStream := aStream;

FSyncObj :=,aSyncObj;

FBuffers aBuffers;

end;

procedure TProducer.Execute;

var

Tail : PBuffer;

begin

{выполнять до момента опустошения потока...}

repeat

{сигнализировать о готовности к началу генерирования данных}

FSyncObj.StartProducing;

{считать блок из потока в конечный буфер}

Tail FBuffers.Tail;

Tail^.bCount := FStream.Read(Tail^.bBlock, BufferSize);

{переместить указатель конца очереди}

FBuffers.AdvanceTail;

{поскольку выполняется запись нового буфера, необходимо сигнализировать о созданных данных}

FSyncObj.StopProducing;

until (Tail^.bCount ? 0);

end;

type

TConsumer = class(TThread) private

FBuffers : TQueuedBuffers;

FStream : TStream;

FSyncObj : TtdProduceConsumeSync;

protected

procedure Execute; override;

public

constructor Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

end;

constructor TConsumer.Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

begin

inherited Create (true);

FStream := aStream;

FSyncObj := aSyncObj;

FBuffers := aBuffers;

end;

procedure TConsumer.Execute;

var

Head : PBuffer;

begin

{сигнализировать о готовности к началу потребления данных}

FSyncObj.StartConsuming;

{извлечь начальный буфер}

Head := FBuffers.Head;

{до тех пор, пока начальный буфер не опустошен...}

while (Head^.bCount <> 0) do

begin

{выполнить запись блока из начального буфера в поток}

FStream.Write(Head^.bBlock, Head^.bCount);

{переместить указатель начала очереди}

FBuffers.AdvanceHead;

{поскольку было выполнено считывание и обработка буфера, необходимо сообщить о том, что данные были использованы}

FSyncObj.StopConsuming;

{сигнализировать о готовности снова приступить к потреблению данных}

FSyncObj.StartConsuming;

{извлечь начальный буфер}

Head := FBuffers.Head;

end;

end;

И, наконец, мы можем рассмотреть подпрограмму копирования потока, приведенную в листинге 12.14. Она принимает два параметра: входной поток и выходной поток. Подпрограмма создает специальный объект типа TQueuedBuffers. Этот объект содержит все ресурсы и методы, необходимые для реализации организованного в виде очереди набора буферов. Он создает также экземпляр класса TtdProducerConsumerSync, который будет действовать в качестве объекта синхронизации, обеспечивающего согласованную работу производителя и потребителя.