I need to terminate my running thread in a way that doesn't cause an error or delay. The problem lies in the function "lMsg := lMsgQueue.Get(FQueueGetTimeout);", where it will wait for the defined time (usually 5000 ms). Thus, if I need to call an external terminate, my application will be stuck waiting for the termination.
What would be the best way to terminate it in the middle of the process?
{ TConsumerThread }
constructor TConsumerThread.Create;
begin
FreeOnTerminate := True;
InitializeVars;
inherited Create(True);
end;
procedure TConsumerThread.Execute;
var
lMsgQueue: TAMQPMessageQueue;
lMsg: TAMQPMessage;
lStartTime: TDateTime;
begin
lMsgQueue := TAMQPMessageQueue.Create;
FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
try
try
FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
lStartTime := Now;
repeat
try
try
if not(FConnectionAMQP.IsOpen) then
BEGIN
FConnectionAMQP.Connect;
FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer';
END;
except
on E: Exception do
Break;
end;
lMsg := lMsgQueue.Get(FQueueGetTimeout);
if (lMsg = nil) and not(Terminated) then
begin
if Assigned(FChannelAMQPThread) then
begin
FConnectionAMQP.CloseChannel(FChannelAMQPThread);
FChannelAMQPThread := nil;
end;
FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
end;
if not(Terminated) then
begin
try
if not(FConnectionAMQP.IsOpen) then
FConnectionAMQP.Connect;
except
on E: Exception do
Break;
end;
end;
if not(Terminated) then
begin
if ValidateFilter(lMsg) then
begin
FCorrelationID := lMsg.Header.PropertyList.CorrelationID.Value;
FReceivedMessage := lMsg.Body.asString[TEncoding.ASCII];
lMsg.Ack;
lMsg.Free;
Terminate;
end
else
begin
lMsg.Reject;
lMsg.Free;
if not(FTimeout = INFINITE) then
begin
if (MilliSecondsBetween(Now, lStartTime) >= (Int64(FTimeout))) then
begin
FReceivedMessage := '';
Terminate;
end;
end;
end;
end
else
begin
Terminate;
end;
except
on E: Exception do
begin
if Assigned(lMsg) then
begin
lMsg.Free;
lMsg := nil;
end;
end;
end;
until (Terminated);
except
on E: Exception do
begin
FReceivedMessage := '';
if not(Terminated) then
Terminate;
end;
end;
finally
lMsgQueue.Free;
end;
end;
procedure TConsumerThread.TerminatedSet;
begin
inherited;
if Assigned(FChannelAMQPThread) then
begin
try
if FConnectionAMQP.IsOpen then
FConnectionAMQP.CloseChannel(FChannelAMQPThread);
except
on E: Exception do
end;
FChannelAMQPThread := nil;
end;
end;
function TConsumerThread.ValidateFilter(pMsg: TAMQPMessage): Boolean;
begin
Result := False;
case FMsgFilter of
fmsgNone:
Result := True;
fmsgMessageID:
Result := (pMsg.Header.PropertyList.MessageID.Value = FFilterValue);
fmsgCorrelationID:
Result := (pMsg.Header.PropertyList.CorrelationID.Value = FFilterValue);
end;
end;
procedure TConsumerThread.InitializeVars;
begin
FConnectionAMQP := nil;
FChannelAMQPThread := nil;
FQueue := '';
FTimeout := INFINITE;
FQueueGetTimeout := 5000;
FQueuePrefetchSize := 0;
FQueuePrefetchCount := 10;
FMsgFilter := fmsgNone;
FFilterValue := '';
FReceivedMessage := '';
end;
To check the returned message or exception, I am using a function in OnTerminate.
Also, making it "FreeOnTerminate" in this case is the best alternative?
I start it suspended because I set the properties (initialized in InitializeVars) before the start.
This code is from the "Get" function, I didn't write it, but I can edit it if necessary.
{$I AMQP.Options.inc}
unit AMQP.Classes;
interface
Uses
SysUtils, Classes, SyncObjs, Generics.Collections,
AMQP.Frame, AMQP.Message, AMQP.Method, AMQP.Types
{$IfDef fpc}
, AMQP.SyncObjs
{$EndIf}
;
Type
AMQPException = Class(Exception);
AMQPTimeout = class(AMQPException);
TAMQPServerProperties = Class
Strict Private
FCapabilities : TStringList;
FMechanisms : TStringList;
FLocales : TStringList;
FClusterName : String;
FCopyright : String;
FInformation : String;
FPlatform : String;
FProduct : String;
FVersion : String;
FKnownHosts : String;
FVersionMajor : Integer;
FVersionMinor : Integer;
FChannelMax : Integer;
FFrameMax : Integer;
FHeartbeat : Integer;
Public
Property Capabilities : TStringList read FCapabilities;
Property Mechanisms : TStringList read FMechanisms;
Property Locales : TStringList read FLocales;
Property ClusterName : String read FClusterName;
Property Copyright : String read FCopyright;
Property Information : String read FInformation;
Property &Platform : String read FPlatform;
Property Product : String read FProduct;
Property Version : String read FVersion;
Property KnownHosts : String read FKnownHosts;
Property ProtocolVersionMajor : Integer read FVersionMajor;
Property ProtocolVersionMinor : Integer read FVersionMinor;
Property ChannelMax : Integer read FChannelMax;
Property FrameMax : Integer read FFrameMax;
Property Heartbeat : Integer read FHeartbeat;
Procedure ReadConnectionStart( AConnectionStart: TAMQPMethod );
Procedure ReadConnectionTune( AConnectionTune: TAMQPMethod );
Procedure ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );
Constructor Create;
Destructor Destroy; Override;
End;
TBlockingQueue<T> = Class
Strict Protected
FGuard : {$IFDEF FPC}TRTLCriticalSection{$ELSE}TCriticalSection{$ENDIF};
FCondition : TConditionVariableCS;
FQueue : TQueue<T>;
Public
Function Count: Integer; Virtual;
Function Get(ATimeOut: LongWord): T; Virtual;
Procedure Put( Item: T ); Virtual;
Constructor Create; Virtual;
Destructor Destroy; Override;
End;
TAMQPQueue = TBlockingQueue<TAMQPFrame>;
TAMQPMessageQueue = TBlockingQueue<TAMQPMessage>;
implementation
{ TAMQPServerProperties }
constructor TAMQPServerProperties.Create;
begin
FCapabilities := TStringList.Create;
FMechanisms := TStringList.Create;
FLocales := TStringList.Create;
FMechanisms.StrictDelimiter := True;
FMechanisms.Delimiter := ' ';
FLocales.StrictDelimiter := True;
FLocales.Delimiter := ' ';
FClusterName := '';
FCopyright := '';
FInformation := '';
FPlatform := '';
FProduct := '';
FVersion := '';
FKnownHosts := '';
FVersionMajor := 0;
FVersionMinor := 0;
FChannelMax := 0;
FFrameMax := 0;
FHeartbeat := 0;
end;
Procedure TAMQPServerProperties.ReadConnectionStart( AConnectionStart: TAMQPMethod );
var
ServerProperties: TFieldTable;
ServerCapabilities: TFieldTable;
Pair: TFieldValuePair;
begin
FMechanisms.DelimitedText := AConnectionStart.Field['mechanisms'].AsLongString.Value;
FLocales.DelimitedText := AConnectionStart.Field['locales'].AsLongString.Value;
ServerProperties := AConnectionStart.Field['server-properties'].AsFieldTable;
FVersionMajor := AConnectionStart.Field['version-major'].AsShortShortUInt.Value;
FVersionMinor := AConnectionStart.Field['version-minor'].AsShortShortUInt.Value;
FClusterName := ServerProperties.Field['cluster_name'].AsShortString.Value;
FCopyright := ServerProperties.Field['copyright'].AsShortString.Value;
FInformation := ServerProperties.Field['information'].AsShortString.Value;
FPlatform := ServerProperties.Field['platform'].AsShortString.Value;
FProduct := ServerProperties.Field['product'].AsShortString.Value;
FVersion := ServerProperties.Field['version'].AsShortString.Value;
ServerCapabilities := ServerProperties.Field['capabilities'].AsFieldTable;
for Pair in ServerCapabilities do
FCapabilities.Values[ Pair.Name.Value ] := Pair.Value.AsString('');
end;
Procedure TAMQPServerProperties.ReadConnectionTune( AConnectionTune: TAMQPMethod );
begin
FChannelMax := AConnectionTune.Field['channel-max'].AsShortUInt.Value;
FFrameMax := AConnectionTune.Field['frame-max'].AsLongUInt.Value;
FHeartbeat := AConnectionTune.Field['heartbeat'].AsShortUInt.Value;
end;
Procedure TAMQPServerProperties.ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );
begin
FKnownHosts := AConnectionOpenOK.Field['known-hosts'].AsShortString.Value;
end;
destructor TAMQPServerProperties.Destroy;
begin
FCapabilities.Free;
FMechanisms.Free;
FLocales.Free;
inherited;
end;
{ TBlockingQueue<T> }
function TBlockingQueue<T>.Count: Integer;
begin
{$IFDEF FPC}
EnterCriticalSection(FGuard);
{$ELSE}
FGuard.Acquire;
{$ENDIF}
try
Result := FQueue.Count;
finally
{$IFDEF FPC}
LeaveCriticalSection(FGuard);
{$ELSE}
FGuard.Release;
{$ENDIF}
end;
end;
constructor TBlockingQueue<T>.Create;
begin
inherited;
{$IFDEF FPC}
InitCriticalSection(FGuard);
{$ELSE}
FGuard := TCriticalSection.Create;
{$ENDIF}
FCondition := TConditionVariableCS.Create;
FQueue := TQueue<T>.Create;
end;
destructor TBlockingQueue<T>.Destroy;
begin
FQueue.Free;
FQueue := nil;
FCondition.Free;
FCondition := nil;
{$IFDEF FPC}
DoneCriticalSection(FGuard);
{$ELSE}
FGuard.Free;
FGuard := nil;
{$ENDIF}
inherited;
end;
function TBlockingQueue<T>.Get(ATimeOut: LongWord): T;
begin
{$IFDEF FPC}
EnterCriticalSection(FGuard);
{$ELSE}
FGuard.Acquire;
{$ENDIF}
try
while FQueue.Count = 0 do
begin
{$IFDEF FPC}
if FCondition.WaitForRTL(FGuard, ATimeOut) = wrTimeout then
{$Else}
if FCondition.WaitFor(FGuard, ATimeOut) = wrTimeout then
{$EndIf}
raise AMQPTimeout.Create('Timeout!');
end;
Result := FQueue.Dequeue
finally
{$IFDEF FPC}
LeaveCriticalSection(FGuard);
{$ELSE}
FGuard.Release;
{$ENDIF}
end;
end;
procedure TBlockingQueue<T>.Put(Item: T);
begin
{$IFDEF FPC}
EnterCriticalSection(FGuard);
{$ELSE}
FGuard.Acquire;
{$ENDIF}
try
FQueue.Enqueue( Item );
FCondition.ReleaseAll;
finally
{$IFDEF FPC}
LeaveCriticalSection(FGuard);
{$ELSE}
FGuard.Release;
{$ENDIF}
end;
end;
end.
Best Answer
TBlockingQueue<T>.Get()
is waiting on aTConditionVariableCS
to be signaled up to the specified timeout. To make that exit more quickly, you would have to signal the ConditionVariable even if nothing was added to the queue. If you did that, you would have to updateGet()
to make sure the queue is not empty before dequeueing it, and have a flag somewhere that the cancel is intentional. Then you would just set that flag and signal the ConditionVariable when you want to cancel the wait.But in your case, it looks like your thread is queuing object pointers, and already handling
nil
pointers, so updatingGet()
is unnecessary. When terminating the thread, simply queue up anil
object pointer as your flag, and then the thread can check itsTerminated
property when it receives anil
pointer from the queue.Something like this: