Чтение онлайн

ЖАНРЫ

Фундаментальные алгоритмы и структуры данных в Delphi

Бакнелл Джулиан М.

Шрифт:

Полный исходный код реализации класса TtdProduceManyConsumeSync можно найти на Web-сайте издательства, в разделе материалов. После выгрузки материалов отыщите среди них файл TDPCSync.pas.

В качестве примера программы мы рассмотрим подпрограмму многопоточного копирования, выполняющую копирование потока в три других потока. Как и в случае примера, приведенного в листинге 12.14, производитель будет считывать исходный поток в буфера, количество которых может доходить до 20. Потребители, количество которых теперь равняется трем, будут считывать буфера

и выполнять запись в собственные потоки.

Класс TQueuedBuffers (листинг 12.19) должен быть несколько изменен, поскольку ему необходимо хранить указатель начала очереди для нескольких потребителей и, следовательно, он должен содержать массив таких указателей.

Листинг 12.19. Класс TQueuedBuffers для модели с несколькими потребителями type

PBuffer = ^TBuffer;

TBuffer = packed record

bCount : longint;

bBlock : array [0..pred(BufferSize) ] of byte;

end;

PBufferArray = ^TBufferArray;

TBufferArray = array [0..pred(MaxBuffers) ] of PBuffer;

TQueuedBuffers = class private

FBufCount : integer;

FBuffers : PBufferArray;

FConsumerCount : integer;

FHead : array [0..pred(MaxConsumers)] of integer;

FTail : integer;

protected

function qbGetHead(aInx : integer): PBuffer;

function qbGetTail : PBuffer;

public

constructor Create(aBufferCount : integer;

aConsumerCount : integer);

destructor Destroy; override;

procedureAdvanceHead(aConsumerId : integer);

procedure AdvanceTail;

property Head [aInx : integer] : PBuffer read qbGetHead;

property Tail : PBuffer read qbGetTail;

property ConsumerCount : integer read FConsumerCount;

end;

constructor TQueuedBuffers.Create(aBufferCount : integer;

aConsumerCount : integer);

var

i : integer;

begin

inherited Create;

{распределить буферы}

FBuffers := AllocMem(aBufferCount * sizeof(pointer));

for i := 0 to pred(aBufferCount) do

GetMem(FBuffers^[i], sizeof(TBuffer));

FBufCount := aBufferCount;

FConsumerCount := aConsumerCount;

end;

destructor TQueuedBuffers.Destroy;

var

i : integer;

begin

{освободить буферы}

if (FBuffers <> nil) then begin

for i := 0 to pred(FBufCount) do

if (FBuffers^[i] <> nil) then

FreeMem(FBuffers^[i], sizeof(TBuffer));

FreeMem(FBuffers, FBufCount * sizeof(pointer));

end;

inherited Destroy;

end;

procedure TQueuedBuffers.AdvanceHead(aConsumerId : integer);

begin

inc(FHead[aConsumerId]);

if (FHead[aConsumerId] = FBufCount) then

FHead[aConsumerId] := 0;

end;

procedure TQueuedBuffers.AdvanceTail;

begin

inc(FTail);

if (FTail = FBufCount) then

FTail := 0;

end;

function TQueuedBuffers.qbGetHead(aInx : integer): PBuffer;

begin

Result := FBuffers^[FHead[aInx]];

end;

function TQueuedBuffers.qbGetTail : PBuffer;

begin

Result := FBuffers^ [FTail];

end;

Следующей

мы рассмотрим реализацию классов производителя и потребителя (листинг 12.20). Класс производителя претерпел не слишком много изменений по сравнению с предыдущей реализацией, в то время как класс потребителя теперь содержит идентификационный номер, посредством которого он обращается к объекту буферов для получения нужного указателя начала очереди.

Листинг 12.20. Классы производителя и потребителя

type

TProducer * class(TThread) private

FBuffers : TQueuedBuffers;

FStream : TStream;

FSyncObj : TtdProduceManyConsumeSync;

protected

procedure Execute; override;

public

constructor Create(aStream : TStream;

aSyncObj : TtdProduceManyConsumeSync;

aBuffers : TQueuedBuffers);

end;

constructor TProducer.Create(aStream : TStream;

aSyncObj : TtdProduceManyConsumeSync;

aBuffers : TQueuedBuffers);

begin

inherited Create (true);

FStream := aStream;

FSyncObj := aSyncObj;

FBuffers := aBuffers;

end;

procedure TProducer.Execute;

var

Tail : PBuffer;

begin

{выполнять до тех nop, пока поток не будет исчерпан...}

repeat

{передать сигнал о готовности к началу генерации данных}

FSyncObj.StartProducing;

{выполнить считывание блока из потока в конечный буфер очереди}

Tail := FBuffers.Tail;

Tail74.bCount := FStream.Read (Tail^.ЬВ1оск, 1024);

{переместить указатель конца очереди}

FBuffers.AdvanceTail;

{передать сигнал о прекращении генерации данных}

FSyncObj.StopProducing;

until (Tail^.bCount = 0);

end;

type

TConsumer = class (TThread) private

FBuffers : TQueuedBuffers;

FID : integer;

FStream : TStream;

FSyncObj : TtdProduceManyConsumeSync;

protected

procedure Execute; override;

public

constructor Create(aStream : TStream;

aSyncObj : TtdProduceManyConsumeSync;

aBuffers : TQueuedBuffers;

alD : integer);

end;

constructor TConsumer.Create(aStream : TStream;

aSyncObj : TtdProduceManyConsumeSync;

aBuffers : TQueuedBuffers;

alD : integer);

begin

inherited Create (true);

FStream := aStream;

FSyncObj := aSyncObj;

FBuffers := aBuffers;

FID := alD;

end;

Поделиться с друзьями: