home *** CD-ROM | disk | FTP | other *** search
- { 10-05-1999 10:38:00 PM > [martin on MARTIN] checked out /Reformatting
- according to Delphi guidelines. }
- { 10-05-1999 1:31:20 AM > [martin on MARTIN] update: Including winsock
- message in fatal error (0.4) / }
- { 08-05-1999 7:04:00 PM > [martin on MARTIN] checked out /Including winsock
- message in fatal error }
- { 14-04-1999 11:59:23 PM > [martin on MARTIN] update: Changing dynamic
- methods to virtual. (0.3) / }
- { 14-04-1999 11:54:16 PM > [martin on MARTIN] checked out /Changing dynamic
- methods to virtual. }
- { 14-04-1999 11:19:53 PM > [martin on MARTIN] update: Modifying code that
- checks whether OK to send, so that structures can be sent to the transaction
- manager whilst the associated socket is connecting. This means that the ONCP does not require a cache. (0.2) / }
- { 06-04-1999 9:14:07 PM > [martin on MARTIN] checked out /Modifying code
- that checks whether OK to send, so that structures can be sent to the transaction
- manager whilst the associated socket is connecting. This means that the ONCP does not require a cache. }
- { 06-04-1999 7:44:46 PM > [martin on MARTIN] checked out /Modifying class
- names }
- { 06-04-1999 1:46:40 AM > [martin on MARTIN] check in: (0.0) Initial Version
- / None }
- unit MCHTransactions;
-
- {Martin Harvey 20/10/1997
-
- This unit implements a socket suitable for connection to
- a transaction manager, and it also defines the transaction
- manager.
- The transaction manager is an intermediary between the socket
- and the Data Object Protocol Manager
-
- The basic structure is as follows:
-
- TTransStreamSock<--->TMCHTransactionManager<---->TMCHDopManager
- The TMCHDopManager will be left to a separate unit.
- }
-
- {
- This unit has been slightly modified. 7/11/97
- Whereas previously, the Transaction manager was created upon connection
- establishment, it is now created and destroyed at the same time as the socket
- object. However, the transaction manager now resets itself when a connection
- on the socket is created or destroyed.
-
- In addition, the stream socket now allows for the creation of derived classes from
- TMCHTransactionManager
- }
-
- { Yet more modifications, Martin Harvey 30/8/98
-
- Due to various requirements, like having transaction managers that save to disks
- and to pipes, I'm creating a custom ancestor class that connects to a DOP,
- and then subclasses that work with sockets, files, pipes etc etc}
-
- interface
-
- uses MCHMemoryStream,Classes,DWinsock,SysUtils,WSAPI;
-
- type
-
- TFatalErrorEvent = procedure(Sender:TObject;Msg:string) of object;
-
- TMCHCustomTransactionManager = class
- private
- FOnReset:TNotifyEvent;
- FOnFatalError:TFatalErrorEvent;
- FOnDestroy:TNotifyEvent;
- FOnTransactionRecieved:TNotifyEvent;
- FDOPManager:TObject;
- protected
- procedure DoTransactionRecieved;virtual;
- procedure DoFatalError(Msg:string);virtual;
- procedure DoDestroy;virtual;
- function GetActive:boolean;virtual;abstract;
- public
- destructor Destroy;override;
- property DOPManager:TObject
- read FDOPManager
- write FDOPManager;
- property OnTransactionRecieved:TNotifyEvent
- read FOnTransactionRecieved
- write FOnTransactionRecieved;
- {informs user when it is about to be destroyed}
- property OnDestroy:TNotifyEvent
- read FOnDestroy
- write FOnDestroy;
- {informs user when a reset event is about to occur}
- property OnReset:TNotifyEvent
- read FOnReset
- write FOnReset;
- property OnFatalError:TFatalErrorEvent
- read FOnFatalError
- write FOnFatalError;
- property Active:boolean
- read GetActive;
- constructor Create;
- procedure WriteTransactionFromStream(ExternalIn:TStream);virtual;abstract;
- procedure ReadTransactionToStream(ExternalOut:TStream);virtual;abstract;
- procedure Reset;virtual;
- published
- end;
-
- TMCHTransactionManager = class; {forward declaration}
-
- TCustomManagerClass = class of TMCHCustomTransactionManager;
-
- TTransStreamSock = class(TStreamSocket)
- private
- FManager:TMCHCustomTransactionManager;
- protected
- {these methods normally call the parent component
- and allow delegation through the parent component to the form
- doing the delegation.
- I am overriding them to provide a direct way of letting the transaction
- manager get to the data first.}
- procedure DoAccept;override;
- procedure DoConnect;override;
- procedure DoDisconnect;override;
- procedure DoReject;override;
- procedure DoTimeout;override;
- procedure DoWrite;override;
- procedure DoRead;override;
- public
- property Manager:TMCHCustomTransactionManager read FManager write FManager;
- {connection to the manager object to allow direct access to it}
- constructor Create(AParent:TSockCtrl);override;
- {note that set manager class should only be called immediately
- after the socket is created}
- destructor Destroy;override;
- published
- end;
-
- TMCHTransactionManager = class(TMCHCustomTransactionManager)
- private
- FTransSock:TTransStreamSock;
- FInputStream,
- FOutputStream:
- TMCHMemoryStream;
- FReadingTransaction:boolean;
- FSocketReady:boolean;
- FReadTransactionStart,
- FReadTransactionEnd:integer;
- FWriteSocketOffset:integer;
- protected
- {offset of current transaction in read stream}
- property ReadTransactionStart:integer
- read FReadTransactionStart
- write FReadTransactionStart;
- {projected end of current transaction}
- property ReadTransactionEnd:integer
- read FReadTransactionEnd
- write FReadTransactionEnd;
- {position of writing data to socket}
- property WriteSocketOffset:integer
- read FWriteSocketOffset
- write FWriteSocketOffset;
- {stream properties}
- property InputStream:TMCHMemoryStream
- read FInputStream
- write FInputStream;
- property OutputStream:TMCHMemoryStream
- read FOutputStream
- write FOutputStream;
- {internal procedures}
- procedure TrimReadStream;
- procedure TrimWriteStream;
- {write to socket puts as many bytes as possible from ouput stream to socket,
- and returns number put}
- function WriteToSocket(Sock:TStreamSocket;Stream:TStream;pos,size:integer):integer;
- { read from socket reads as many bytes as possible to the end of the input
- stream, and returns how many bytes read}
- function ReadFromSocket(Sock:TStreamSocket;Stream:TStream):integer;
- function GetSocketReady:boolean;
- procedure ExtractTransactionsFromStream;
- function GetActive:boolean;override;
- public
- {user accessible properties}
- {owning socket}
- property TransSock:TTransStreamSock read FTransSock write FTransSock;
- {Reading side of things}
- {user event notifying that a transaction is ready}
- {note: this is the only chance that external code
- has of picking up the transaction, pointers are automatically incremented
- afterwards! (This may be changed later)}
- property OnTransactionRecieved;
- {informs user when it is about to be destroyed}
- property OnDestroy;
- {informs user when a reset event is about to occur}
- property OnReset;
- { an unfinished transaction is in the read stream }
- { This does not preclude a very short transaction
- existing. (ie header is too short to read length}
- { This property doesn't have a write specifier,
- so external objects can't modify it}
- property ReadingTransaction:boolean
- read FReadingTransaction;
- { The socket ready property is used to ensure that
- only one send operation is called for every OnWrite event}
- property SocketReady:boolean
- read GetSocketReady;
- property Active;
- {constructor and destructor.
- These do not need to be called manually, since the
- owning socket will take care of this }
- constructor Create;
- destructor Destroy;override;
- {user methods for writing transactions}
- { The input stream should contain all the data for one transaction}
- { Note that the outside agent owns the external stream}
- procedure WriteTransactionFromStream(ExternalIn:TStream);override;
- {user methods for reading transactions}
- {The output stream contains a copy of the transaction data
- This should be called externally when an OnTransactionRecieved event occurs}
- {Note that the outside agent owns the external stream}
- procedure ReadTransactionToStream(ExternalOut:TStream);override;
- {methods for owning socket to call.
- The user should *not* call these!!}
- {writing side of things}
- procedure WriteDataRemainder;
- {reading side}
- procedure ReadNewData;
- { a reset procedure to be used when the connection underlying the socket has
- been closed}
- procedure Reset;override;
- published
- property OnFatalError;
- end;
-
- implementation
-
- uses MCHDOPManager,Dialogs;
-
- constructor TMCHCustomTransactionManager.Create;
- begin
- inherited Create;
- DOPManager := TMCHDopManager.Create;
- (DOPManager as TMCHDopManager).DOPTransactionManager := Self;
- end;
-
- destructor TMCHCustomTransactionManager.Destroy;
- begin
- DoDestroy;
- DOPManager.Free;
- DOPManager := nil;
- inherited Destroy;
- end;
-
- procedure TMCHCustomTransactionManager.DoFatalError(Msg:string);
- begin
- if Assigned(FOnFatalError) then FOnFatalError(Self,Msg);
- end;
-
- procedure TMCHCustomTransactionManager.Reset;
- begin
- if Assigned(DOPManager) then
- (DOPManager as TMCHDopManager).Reset;
- if Assigned(FOnReset) then FOnReset(Self);
- end;
-
- procedure TMCHCustomTransactionManager.DoTransactionRecieved;
- begin
- if (Assigned(FOnTransactionRecieved)) then
- FOnTransactionRecieved(Self);
- try
- if Assigned(DOPManager) then
- (DOPManager as TMCHDopManager).GetIncomingTransaction(Self);
- except
- on E:ENetworkError do begin
- ShowMessage('Exception raised on remote machine, '
- + E.RemoteExceptionName + ' ' +
- E.Message);
- end;
- end;
- end;
-
- procedure TMCHCustomTransactionManager.DoDestroy;
- begin
- if (Assigned(FOnDestroy)) then
- FOnDestroy(Self);
- end;
-
-
- procedure TTransStreamSock.DoAccept;
- begin
- if LastError = 0 then
- begin
- if Assigned(Manager) then
- Manager.Reset;
- end;
- inherited DoAccept;
- end;
-
- procedure TTransStreamSock.DoConnect;
- begin
- {Note that this has been modified. We now only
- reset the transaction manager if the connection *fails*}
- if LastError <> 0 then
- begin
- if Assigned(Manager) then
- Manager.Reset;
- end;
- inherited DoConnect;
- end;
-
- procedure TTransStreamSock.DoDisconnect;
- begin
- if Assigned(Manager) then
- Manager.Reset;
- inherited DoDisconnect;
- end;
-
- procedure TTransStreamSock.DoReject;
- begin
- if Assigned(Manager) then
- Manager.Reset;
- inherited DoReject;
- end;
-
- procedure TTransStreamSock.DoTimeOut;
- begin
- if Assigned(Manager) then
- Manager.Reset;
- inherited DoTimeOut;
- end;
-
- procedure TTransStreamSock.DoWrite;
- begin
- if Assigned(Manager) then
- (Manager as TMCHTransactionManager).WriteDataRemainder;
- inherited DoWrite;
- end;
-
- procedure TTransStreamSock.DoRead;
- begin
- if Assigned(Manager) then
- (Manager as TMCHTransactionManager).ReadNewData;
- inherited DoRead;
- end;
-
- destructor TTransStreamSock.Destroy;
- begin
- if Assigned(Manager) then
- begin
- Manager.Free;
- Manager := nil;
- end;
- inherited Destroy;
- end;
-
- constructor TTransStreamSock.Create(AParent:TSockCtrl);
- begin
- inherited Create(AParent);
- Manager := TMCHTransactionManager.Create;
- (Manager as TMCHTransactionManager).TransSock := Self;
- end;
-
-
- function TMCHTransactionManager.GetSocketReady:boolean;
- begin
- result := FTransSock.Connected and FSocketReady;
- end;
-
- {$WARNINGS OFF}
-
- function TMCHTransactionManager.ReadFromSocket(Sock:TStreamSocket;Stream:TStream):integer;
-
- const
- blocksize = 4096;
- var
- databuf:array[0..blocksize - 1] of byte;
- TotalBytesRead,BytesRead:integer;
-
- begin
- Stream.Seek(0,SoFromEnd); {seek to the end of the input stream}
- TotalBytesRead := 0;
- repeat
- try
- BytesRead := Sock.Recv(databuf,blocksize);
- except
- on E:ESockError do begin
- DoFatalError(E.Message);
- BytesRead := 0;
- end;
- end;
- Stream.WriteBuffer(Databuf,BytesRead);
- TotalBytesRead := TotalBytesRead + BytesRead;
- until BytesRead < blocksize;
- result := TotalBytesRead;
- end;
-
- {$WARNINGS ON}
-
- constructor TMCHTransactionManager.Create;
- begin
- inherited Create;
- FInputStream := TMCHMemoryStream.Create;
- FOutputStream := TMCHMemoryStream.Create;
- end;
-
- destructor TMCHTransactionManager.Destroy;
- begin
- FInputStream.Free;
- FOutputStream.Free;
- inherited Destroy;
- end;
-
- function TMCHTransactionManager.GetActive:boolean;
- begin
- result := true;
- if (WriteSocketOffset = OutputStream.Size)
- and (ReadTransactionStart = InputStream.Size)
- and (not ReadingTransaction)
- then result := false;
- end;
-
-
- procedure TMCHTransactionManager.Reset;
- {This procedure resets all pointers and clears the stream}
- begin
- inherited Reset;
- ReadTransactionStart := 0;
- ReadTransactionEnd := 0;
- WriteSocketOffset := 0;
- FReadingTransaction := false;
- InputStream.SetSize(0);
- OutputStream.SetSize(0);
- end;
-
- procedure TMCHTransactionManager.ReadNewData;
-
- var
- TransLength:integer;
-
- begin
- ReadFromSocket(TransSock,InputStream);
- if (not (ReadingTransaction)) then
- { No transactions pending, Read Start & Read End point to same place}
- begin
- if (InputStream.Size >= (ReadTransactionEnd + SizeOf(TransLength))) then
- begin
- InputStream.Seek(ReadTransactionEnd,soFromBeginning);
- InputStream.ReadBuffer(TransLength,SizeOf(TransLength));
- ReadTransactionStart := InputStream.Position;
- ReadTransactionEnd := ReadTransactionStart + TransLength;
- FReadingTransaction := true;
- end;
- end;
- ExtractTransactionsFromStream;
- end;
-
- procedure TMCHTransactionManager.ExtractTransactionsFromStream;
-
- var
- TransLength:integer;
-
- begin
- {do we have a complete transaction?}
- while ((ReadTransactionEnd <= InputStream.Size) and ReadingTransaction) do
- begin
- {we have a complete pending transaction}
- DoTransactionRecieved; {this will pick the transaction up if the user assigns a handler}
- {now advance pointers to the next transaction}
- ReadTransactionStart := ReadTransactionEnd;
- if (InputStream.Size >= ReadTransactionStart + SizeOf(TransLength)) then
- begin
- {we at least have the integer transaction length}
- InputStream.Seek(ReadTransactionEnd,soFromBeginning);
- InputStream.ReadBuffer(TransLength,SizeOf(TransLength));
- ReadTransactionStart := InputStream.Position;
- ReadTransactionEnd := ReadTransactionStart + TransLength;
- {now we know where the transaction ends and the pointers are valid}
- end
- else
- {we don't have anything to work with}
- FReadingTransaction := false;
- end;
- TrimReadStream;
- end;
-
-
- procedure TMCHTransactionManager.TrimReadStream;
-
- const
- MinStreamSize = 1024;
-
- var
- NewStream:TMCHMemoryStream;
-
- begin
- {General policy here:
- If more than 4/5 of the stream is not used, then clear it up.
- In all cases, all data before ReadTransactionStart can be discarded
- if stream less than 1k, then don't bother.
- }
- if (((InputStream.Size - ReadTransactionStart) * 5) < InputStream.Size)
- and (InputStream.Size > MinStreamSize) then
- begin
- NewStream := TMCHMemoryStream.Create;
- { do the work}
- {copy data from old to new stream}
- InputStream.Seek(ReadTransactionStart,soFromBeginning);
- if (InputStream.Size - ReadTransactionStart) > 0 then
- NewStream.CopyFrom(InputStream,(InputStream.Size - ReadTransactionStart));
- {Swap stream ownership and destroy old stream}
- InputStream.Free;
- InputStream := NewStream;
- {reset pointers accordingly}
- ReadTransactionEnd := ReadTransactionEnd - ReadTransactionStart;
- ReadTransactionStart := 0;
- end;
- end;
-
-
- procedure TMCHTransactionManager.ReadTransactionToStream(ExternalOut:TStream);
- {This will copy the current transaction, but will not modify any pointers}
- begin
- InputStream.Seek(ReadTransactionStart,soFromBeginning);
- ExternalOut.CopyFrom(InputStream,ReadTransactionEnd - ReadTransactionStart);
- end;
-
- {$WARNINGS OFF}
-
- function TMCHTransactionManager.WriteToSocket(Sock:TStreamSocket;Stream:TStream;pos,size:integer):integer;
-
- const
- blocksize = 4096;
-
- var
- databuf:array[0..blocksize - 1] of byte;
- TotalBytesWritten,BytesWritten,BlockBytesToWrite,BytesToWrite:integer;
-
- {write as much as we can to the socket until it's full}
-
- begin
- Stream.Seek(pos,soFromBeginning);
- BytesToWrite := size;
- TotalBytesWritten := 0;
- repeat
- if BytesToWrite > blocksize then
- BlockBytesToWrite := blocksize
- else
- BlockBytesToWrite := BytesToWrite;
- Stream.ReadBuffer(databuf,BlockBytesToWrite);
- try
- BytesWritten := Sock.Send(databuf,BlockBytesToWrite);
- except
- on E:ESockError do
- begin
- DoFatalError(E.Message);
- BytesWritten := 0;
- end;
- end;
- TotalBytesWritten := TotalBytesWritten + BytesWritten;
- BytesToWrite := BytesToWrite - BytesWritten
- until (BytesToWrite = 0) or (BytesWritten < BlockBytesToWrite);
- result := TotalBytesWritten;
- end;
-
- {$WARNINGS ON}
-
- procedure TMCHTransactionManager.WriteDataRemainder;
-
- var
- BytesWritten:integer;
- BytesToWrite:integer;
-
- begin
- FSocketReady := true;
- {socket is ready for more data to be written}
- BytesToWrite := OutputStream.Size - WriteSocketOffset;
- if BytesToWrite > 0 then
- begin
- {write as much as we can}
- BytesWritten := WriteToSocket(TransSock,OutputStream,WriteSocketOffset,BytesToWrite);
- WriteSocketOffset := WriteSocketOffset + BytesWritten;
- if BytesWritten < BytesToWrite then
- FSocketReady := false;
- { we'll get a callback ready for more data}
- end;
- TrimWriteStream;
- end;
-
-
- procedure TMCHTransactionManager.TrimWriteStream;
-
- const
- MinStreamSize = 1024;
-
- var
- NewStream:TMCHMemoryStream;
-
- begin
- {Same general policy as for trimming the read stream}
- if (((OutputStream.Size - WriteSocketOffset) * 5) < OutputStream.Size) and (OutputStream.Size > MinStreamSize) then
- begin
- NewStream := TMCHMemoryStream.Create;
- try
- {copy}
- OutputStream.Seek(WriteSocketOffset,soFromBeginning);
- if (OutputStream.Size - WriteSocketOffset) > 0 then
- NewStream.CopyFrom(OutputStream,(OutputStream.Size - WriteSocketOffset));
- {Swap ownership & destroy}
- OutputStream.Free;
- OutputStream := TMCHMemoryStream.Create;
- OutputStream.LoadFromStream(NewStream);
- WriteSocketOffset := 0;
- finally
- NewStream.Free;
- end;
- end;
- end;
-
-
- procedure TMCHTransactionManager.WriteTransactionFromStream(ExternalIn:TStream);
-
- var
- Length:integer;
-
- begin
- { write length data to output stream }
- Length := ExternalIn.Size;
- OutputStream.Seek(0,soFromEnd);
- OutputStream.WriteBuffer(Length,SizeOf(Length));
- ExternalIn.Seek(0,soFromBeginning);
- OutputStream.CopyFrom(ExternalIn,ExternalIn.Size);
- if SocketReady then
- WriteDataRemainder;
- {only force a send if the socket is empty and connected}
- end;
-
- end.
-
-