Hi everyone, I am quite new to this mailing list, but I hope anyone can answer
my question.
The problem I am experiencing is as follows:
I get MPEG2 TS-data from a realtime hardware encoder who is multicasting into
the local network. I am using TWSocket to receive the datapackes. As some of
you might know, MPEG2 Transport Stream (TS) consists of 188 Byte including a
4Byte header, which I can check to see, if the received package is OK.
Now, if I am working with only one thread, everything is fine. Once I am adding
a second thread (OnDataAvailable -> RingBuffer; RingBuffer-> Thread2 -> File),
it seems that OnDataAvailable isn't executed for every event and so buffers the
data.
The hardware encoder packs 7 MPEG2 packages into one udp-packet, so the
standard-size package I get is 1316 Byte. Now, once I am applying the second
thread, packages get multiples of that (2632), but they all share the same
characteristics: The first header is damaged, 2nd - 7th are fine, 8th -
whatever size are empty or random data.
I was wondering if maybe TWSocket does not copy the second 1316Byte packet into
the buffer-area after the first 1316Byte package if OnDataAvailable hasn't been
called in between, but somewhere into the first bytes, which would explain why
the first header (with 188 Bytes following is broken).
This is how parts of my debugfile look like
1316 Byte udp package received
header 1 - OKAY, pid 256 -> payload video
header 2 - OKAY, pid 256 -> payload video
header 3 - OKAY, pid 256 -> payload video
header 4 - OKAY, pid 259 -> payload audio
header 5 - OKAY, pid 256 -> payload video
header 6 - OKAY, pid 256 -> payload video
header 7 - OKAY, pid 256 -> payload video
2632 Byte udp package received
header 1 - ERROR, pid 19440
header 2 - OKAY, pid 256
header 3 - OKAY, pid 256
header 4 - OKAY, pid 256
header 5 - OKAY, pid 256
header 6 - OKAY, pid 256
header 7 - OKAY, pid 256
header 8 - ERROR, pid 0
header 9 - ERROR, pid 0
header 10 - ERROR, pid 0
header 11 - ERROR, pid 0
header 12 - ERROR, pid 0
header 13 - ERROR, pid 0
header 14 - ERROR, pid 0
The code I am using is in analogy to the TWSocket multithreading example. Code
looks like this:
type
TPos = RECORD
Buffersize, Blocksize : Int64;
CurrentWPos, CurrentRPos, StartPos : Int64;
Diskrepanz : Int64;
Richtung : Byte;
END;
TStreamThread = class(TThread)
private
FAddrDatei : Pointer;
BlockMem : Pointer;
procedure CalculateDiscrepancy;
procedure WriteMemToFile;
procedure ResetPosition;
public
FDatenspeicher : Pointer;
FPos : ^TPos;
FAddrLink : Pointer;
FAusgabe : TLabel;
Ffinalizethread: Boolean;
FExchangeFile : Boolean;
procedure SetLabel;
procedure Execute; override;
end;
TChangeThread = class(TThread)
private
procedure ExchangeFile;
procedure PrepareFile;
public
Mode : Byte;
procedure Execute; override;
end;
TRingBuffer = class(TObject)
private
Speicher : Pointer;
Groesse : Int64;
Start : Pointer;
Writer : Int64;
Reader : Int64;
Diskrepanz : Int64;
Richtung : Integer;
public
fReadyToWrite : TEvent;
fReadyToRead : TEvent;
CS : TCriticalSection;
constructor Create;
procedure GetData(pData : Pointer; pSize : Int64);
procedure PutData(pData : Pointer; pSize : Int64);
end;
var
Datei : ARRAY[0..1] OF TFileStream;
AddrDatei : Pointer;
AddrLink : Pointer;
DoExchange : Byte;
RootName : String;
Socket:TWSocket;
StreamThread : TStreamThread;
ChangeThread : TChangeThread;
ExchangeThread : TChangeThread;
public
{ Public-Deklarationen }
DataRes : Pointer;
Pos : ^TPos;
MyVar : Integer;
CritSec : TCriticalSection;
Wholesize : Int64;
CurrentFileSize: Int64;
WrittenSize : Int64;
Testvariable : Int64;
ReceivedPacks : Int64;
MaxSize : Int64;
PrerunSize : Int64;
FileCount : Integer;
StreamID : Integer;
RecordingID : Integer;
ErrorPacks : Int64;
procedure DataAvailable(Sender : TObject; Error : Word);
procedure WriteToMem;
end;
var
Form2: TForm2;
implementation
{$R *.dfm}
{procedure TForm2.SetLabel(lname: string);
begin
StatusLA.Caption:=lname;
end; }
constructor TForm2.TRingBuffer.Create;
begin
Inherited Create;
GetMem(Start,Groesse);
Writer:=0;
Reader:=0;
Richtung:=0;
CS:=TCriticalSection.Create;
end;
procedure TForm2.TRingbuffer.GetData(pData : Pointer; pSize : Int64);
var zwSize1,zwSize2 : Int64;
begin
IF fReadyToRead.WaitFor(INFINITE) = wrSignaled THEN
BEGIN
fReadyToWrite.ResetEvent;
try
CS.Enter;
Diskrepanz:=Writer+Groesse*Richtung - Reader;
IF (Diskrepanz-pSize) > 0 THEN
BEGIN
IF Reader+pSize <= Groesse THEN
BEGIN
copyMemory(pData,POINTER(Int64(Speicher)+Reader),pSize);
Inc(Reader,pSize);
IF Reader=Groesse THEN
BEGIN
Reader:=0;
Richtung:=0;
END;
END
ELSE BEGIN
zwSize1:=Groesse-Reader;
zwSize2:=Reader+pSize-Groesse;
copyMemory(pData,POINTER(Int64(Speicher)+Reader),zwSize1);
copyMemory(POINTER(Int64(pData)+zwSize1),Speicher,zwSize2);
Reader:=zwSize2;
Richtung:=0;
END;
END;
finally
CS.Leave;
fReadyToWrite.SetEvent;
END;
END;
end;
procedure TForm2.TRingbuffer.PutData(pData : Pointer; pSize : Int64);
var zwSize1, zwSize2 : Int64;
begin
if fReadyToWrite.WaitFor(INFINITE)=wrSignaled THEN
BEGIN
fReadyToRead.ResetEvent;
try
CS.Enter;
Diskrepanz:=Writer+Groesse*Richtung - Reader;
IF Diskrepanz < Groesse THEN
BEGIN
IF Writer + pSize <= Groesse THEN
BEGIN
CopyMemory(POINTER(Int64(Speicher)+Writer),pData,pSize);
Inc(Writer,pSize);
IF Writer=Groesse THEN
BEGIN
Writer:=0;
Richtung:=1;
END;
END
ELSE BEGIN
zwSize1:=Groesse-Writer;
zwSize2:=Writer+pSize-Groesse;
CopyMemory(POINTER(Int64(Speicher)+Writer),pData,zwSize1);
CopyMemory(Speicher,POINTER(INTEGER(pData)+zwsize1),zwSize2);
Writer:=zwsize2;
Richtung:=1;
END;
END;
finally
CS.Leave;
fReadyToRead.SetEvent;
end;
END;
end;
procedure TForm2.TChangeThread.ExchangeFile;
begin
Form2.StreamThread.FExchangeFile:=true;
end;
procedure TForm2.TChangeThread.PrepareFile;
BEGIN
Inc(Form2.FileCount,1);
Form2.Datei[Form2.FileCount MOD
2]:=TFileStream.Create(Form2.rootname+format(fmstring,[Form2.FileCount]),fmCreate);
END;
procedure TForm2.TStreamThread.SetLabel;
begin
Form2.StatusLA.Caption:='StreamThread';
end;
procedure TForm2.TChangeThread.Execute;
begin
FreeOnTerminate:=true;
if Mode=ctmPrepare then
BEGIN
Synchronize(PrepareFile);
END;
IF Mode=ctmExchange THEN
BEGIN
Synchronize(ExchangeFile);
END;
end;
procedure TForm2.TStreamThread.CalculateDiscrepancy;
begin
FPos^.Diskrepanz:=(FPos^.Buffersize * FPos^.Richtung + FPos^.CurrentWPos) -
FPos^.CurrentRPos;
end;
procedure TForm2.TStreamThread.WriteMemToFile;
begin
//Code
IF (FAddrDatei <> nil) THEN
BEGIN
TFileStream(FAddrDatei^).Write(BlockMem^,FPos^.Blocksize);
Inc(FPos^.CurrentRPos,FPos^.BlockSize);
Inc(Form2.WrittenSize,FPos^.BlockSize);
END
ELSE
Form2.StatusLA.Caption:='Error';
end;
procedure TForm2.TStreamThread.ResetPosition;
begin
//Code
FPos^.CurrentRPos:=0;
FPos^.Richtung:=0;
end;
procedure TForm2.TStreamThread.Execute;
var zwPointer : Pointer; zwSize : Int64;
doneit : Boolean; dummymem : String;
begin
GetMem(BlockMem,FPos^.BlockSize);
fExchangeFile:=false;
WHILE not terminated DO
BEGIN
if FExchangeFile THEN
BEGIN
Form2.AddrDatei:[EMAIL PROTECTED] MOD 2];
FreeAndNil(Form2.Datei[(Form2.FileCount-1) MOD 2]);
FExchangeFile:=False;
END;
FAddrDatei:=POINTER(FAddrLink^);
IF {(FPos^.Diskrepanz>=FPos^.Blocksize) AND}
(assigned(TFileStream(FAddrDatei^))) AND (Form2.WrittenSize < Form2.Wholesize)
THEN
BEGIN
IF (FPos^.CurrentRPos + FPos^.BlockSize <= FPos^.Buffersize) THEN
BEGIN
CopyMemory(BlockMem,POINTER(FPos^.StartPos+FPos^.CurrentRPos),FPos^.BlockSize);
Synchronize(WriteMemToFile);
END; //if fpos^.currentrpos+fpos^.blocksize <= fpos^.buffersize
IF (FPos^.CurrentRPos >= FPos^.Buffersize) THEN
BEGIN
Synchronize(ResetPosition);
END; // if fpos^.currentrpos >= fpos^.buffersize
END; // if Diskrepanz>0
END; //while not terminated
dispose(BlockMem);
end;
procedure TForm2.WriteToMem;
begin
//Code
end;
procedure TForm2.DataAvailable(Sender : TObject; Error : Word);
var receive, dest,zw : Pointer; rcvsize : Int64;
zwsize : Int64;
begin
// StreamThread.Suspend;
rcvsize:=Socket.RcvdCount;
IF rcvsize <> Pos^.Blocksize THEN
Label5.Caption:=inttostr(rcvsize);
GetMem(receive,rcvsize);
Socket.Receive(receive,rcvsize);
IF (rcvsize > 0) THEN
BEGIN
Inc(CurrentFilesize,rcvsize);
Inc(WholeSize,rcvsize);
Inc(ReceivedPacks,rcvsize DIV Pos^.Blocksize);
IF (rcvsize MOD Pos^.Blocksize <> 0) THEN
Inc(ErrorPacks);
IF ((Pos^.CurrentWPos+rcvsize) <= Pos^.Buffersize) THEN
BEGIN
dest:=POINTER(Pos^.StartPos+Pos^.CurrentWPos);
CritSec.Enter;
CopyMemory(dest,receive,rcvsize);
CritSec.Leave;
END
ELSE BEGIN
zwsize:=Pos^.Buffersize - Pos^.CurrentWPos;
dest:=POINTER(Pos^.StartPos+Pos^.CurrentWPos);
CritSec.Enter;
CopyMemory(dest,receive,zwsize);
CritSec.Leave;
zw:=POINTER(Int64(receive)+zwsize);
dest:=POINTER(Pos^.StartPos);
zwsize:=(Pos^.CurrentWPos+rcvsize)-Pos^.Buffersize;
CritSec.Enter;
CopyMemory(dest,zw,zwsize);
Pos^.CurrentWPos:=zwsize;
Pos^.Richtung:=1;
CritSec.Leave;
END;
IF Pos^.CurrentWPos+rcvsize > Pos^.Buffersize THEN
BEGIN
CritSec.Enter;
Pos^.CurrentWPos:=0;
Pos^.Richtung:=1;
CritSec.Leave;
Label5.Caption:='Error on size:'+inttostr(rcvsize);
END
ELSE BEGIN
CritSec.Enter;
Inc(Pos^.CurrentWPos,rcvsize);
CritSec.Leave;
END;
END;
// StreamThread.Resume;
IF (CurrentFileSize >= (MaxSize-PreRunSize)) AND (DoExchange = 0) THEN
BEGIN
DoExchange:=1;
ChangeThread:=TChangeThread.Create(true);
ChangeThread.Mode:=ctmPrepare;
ChangeThread.Resume;
END;
IF (CurrentFileSize >= MaxSize) AND (DoExchange=1) THEN
BEGIN
DoExchange:=0;
CurrentFileSize:=0;
ExChangeThread:=TChangeThread.Create(true);
ExChangeThread.Mode:=ctmExchange;
ExChangeThread.Resume;
END;
dispose(receive);
Label1.Caption:=Inttostr(round((Pos^.CurrentWPos/Pos^.Buffersize)*100));
Label2.Caption:=Inttostr(round((Pos^.CurrentRPos/Pos^.Buffersize)*100));
Label3.Caption:=Inttostr(Pos^.Diskrepanz);
Label4.Caption:=Inttostr(CurrentFileSize);
Label5.Caption:=Inttostr(FileCount+1);
Label11.Caption:=Inttostr(WholeSize);
Label12.Caption:=Inttostr(WrittenSize);
Label17.Caption:=Inttostr(receivedPacks);
Label19.Caption:=Inttostr(ErrorPacks);
end;
procedure TForm2.Button2Click(Sender: TObject);
begin
new(Pos);
Pos^.Buffersize:=StrToInt(BuffED.Text);
Pos^.BlockSize:=StrToInt(BlockED.Text);
GetMem(DataRes,Pos^.Buffersize);
Pos^.CurrentWPos:=0;
Pos^.CurrentRPos:=0;
Pos^.StartPos:=Int64(DataRes);
Pos^.Richtung:=0;
Pos^.Diskrepanz:=0;
FileCount:=0;
MaxSize:=strtoint(MAXED.Text);
Prerunsize:=500000;
wholesize:=0;
CurrentFileSize:=0;
WrittenSize:=0;
ReceivedPacks:=0;
ErrorPacks:=0;
end;
procedure TForm2.Button1Click(Sender: TObject);
var TestPointer : Pointer;
begin
WholeSize:=0;
CurrentFileSize:=0;
MyVar:=0;
Socket:=TWSocket.Create(self);
Socket.Addr:='0.0.0.0';
Socket.MultiCastAddrStr:=IPED.Text;
Socket.Port:=PortED.Text;
Socket.Proto:='udp';
Socket.MultiCast:=true;
Socket.ReuseAddr:=true;
Socket.MultiThreaded:=true;
Socket.OnDataAvailable:=DataAvailable;
CritSec:=TCriticalSection.Create;
CreateDir(DIRED.Text);
Rootname:=DIRED.Text+'\'+FILED.Text+format(fmstring,[recordingid])+format(fmstring,[streamid]);
Datei[0]:=TFileStream.Create(Rootname+format(fmstring,[FileCount]),fmCreate);
AddrDatei:[EMAIL PROTECTED];
AddrLink:[EMAIL PROTECTED];
StreamThread:=TStreamThread.Create(true);
StreamThread.FDatenspeicher:=DataRes;
StreamThread.FPos:=Pointer(Int64(Pos));
StreamThread.FAusgabe:=StatusLA;
StreamThread.FAddrLink:=AddrLink;
StreamThread.FreeOnTerminate:=true;
StreamThread.Ffinalizethread:=false;
Socket.Listen;
StreamThread.Resume;
end;
end.
--
Christian Hinske
Schleißheimerstraße 157
D-80797 München
Tel. :(++ 49 89) 36 0 37 445
Mobil :(++49)176 234 10 146
e-mail: [EMAIL PROTECTED]
Der GMX SmartSurfer hilft bis zu 70% Ihrer Onlinekosten zu sparen!
Ideal für Modem und ISDN: http://www.gmx.net/de/go/smartsurfer
--
To unsubscribe or change your settings for TWSocket mailing list
please goto http://www.elists.org/mailman/listinfo/twsocket
Visit our website at http://www.overbyte.be