Hi everyone, I am quite new to this mailing list, but I hope anyone can answer 
my question. 
The problem I am experiencing is as follows:
I get MPEG2 TS-data from a realtime hardware encoder who is multicasting into 
the local network. I am using TWSocket to receive the datapackes. As some of 
you might know, MPEG2 Transport Stream (TS) consists of 188 Byte including a 
4Byte header, which I can check to see, if the received package is OK.
Now, if I am working with only one thread, everything is fine. Once I am adding 
a second thread (OnDataAvailable -> RingBuffer; RingBuffer-> Thread2 -> File), 
it seems that OnDataAvailable isn't executed for every event and so buffers the 
data. 
The hardware encoder packs 7 MPEG2 packages into one udp-packet, so the 
standard-size package I get is 1316 Byte. Now, once I am applying the second 
thread, packages get multiples of that (2632), but they all share the same 
characteristics: The first header is damaged, 2nd - 7th are fine, 8th - 
whatever size are empty or random data.
I was wondering if maybe TWSocket does not copy the second 1316Byte packet into 
the buffer-area after the first 1316Byte package if OnDataAvailable hasn't been 
called in between, but somewhere into the first bytes, which would explain why 
the first header (with 188 Bytes following is broken).
This is how parts of my debugfile look like

1316 Byte udp package received
header 1 - OKAY, pid 256 -> payload video
header 2 - OKAY, pid 256 -> payload video
header 3 - OKAY, pid 256 -> payload video
header 4 - OKAY, pid 259 -> payload audio
header 5 - OKAY, pid 256 -> payload video
header 6 - OKAY, pid 256 -> payload video
header 7 - OKAY, pid 256 -> payload video

2632 Byte udp package received
header 1  - ERROR, pid 19440
header 2  - OKAY, pid 256
header 3  - OKAY, pid 256
header 4  - OKAY, pid 256
header 5  - OKAY, pid 256
header 6  - OKAY, pid 256
header 7  - OKAY, pid 256
header 8  - ERROR, pid 0
header 9  - ERROR, pid 0
header 10 - ERROR, pid 0
header 11 - ERROR, pid 0
header 12 - ERROR, pid 0
header 13 - ERROR, pid 0
header 14 - ERROR, pid 0

The code I am using is in analogy to the TWSocket multithreading example. Code 
looks like this:

type
  TPos = RECORD
    Buffersize, Blocksize : Int64;
    CurrentWPos, CurrentRPos, StartPos : Int64;
    Diskrepanz : Int64;
    Richtung : Byte;
  END;
  TStreamThread = class(TThread)
    private
      FAddrDatei     : Pointer;
      BlockMem       : Pointer;
      procedure CalculateDiscrepancy;
      procedure WriteMemToFile;
      procedure ResetPosition;
    public
      FDatenspeicher : Pointer;
      FPos           : ^TPos;
      FAddrLink      : Pointer;
      FAusgabe       : TLabel;
      Ffinalizethread: Boolean;
      FExchangeFile  : Boolean;
      procedure SetLabel;
      procedure Execute; override;
  end;
  TChangeThread = class(TThread)
    private
      procedure ExchangeFile;
      procedure PrepareFile;
    public
      Mode     : Byte;
      procedure Execute; override;
  end;
  TRingBuffer = class(TObject)
    private
      Speicher   : Pointer;
      Groesse    : Int64;
      Start      : Pointer;
      Writer     : Int64;
      Reader     : Int64;
      Diskrepanz : Int64;
      Richtung   : Integer;
    public
      fReadyToWrite : TEvent;
      fReadyToRead  : TEvent;
      CS : TCriticalSection;
      constructor Create;
      procedure GetData(pData : Pointer; pSize : Int64);
      procedure PutData(pData : Pointer; pSize : Int64);
  end;
  var
    Datei                 : ARRAY[0..1] OF TFileStream;
    AddrDatei             : Pointer;
    AddrLink              : Pointer;
    DoExchange            : Byte;
    RootName              : String;
    Socket:TWSocket;
    StreamThread          : TStreamThread;
    ChangeThread          : TChangeThread;
    ExchangeThread        : TChangeThread;

  public
    { Public-Deklarationen }

    DataRes        : Pointer;
    Pos            : ^TPos;
    MyVar          : Integer;
    CritSec        : TCriticalSection;
    Wholesize      : Int64;
    CurrentFileSize: Int64;
    WrittenSize    : Int64;
    Testvariable   : Int64;
    ReceivedPacks  : Int64;
    MaxSize        : Int64;
    PrerunSize     : Int64;
    FileCount      : Integer;
    StreamID       : Integer;
    RecordingID    : Integer;
    ErrorPacks     : Int64;
    procedure DataAvailable(Sender : TObject; Error : Word);
    procedure WriteToMem;
  end;
var
  Form2: TForm2;

implementation
{$R *.dfm}
{procedure TForm2.SetLabel(lname: string);
begin
  StatusLA.Caption:=lname;
end;  }
constructor TForm2.TRingBuffer.Create;
begin
  Inherited Create;
  GetMem(Start,Groesse);
  Writer:=0;
  Reader:=0;
  Richtung:=0;
  CS:=TCriticalSection.Create;
end;
procedure TForm2.TRingbuffer.GetData(pData : Pointer; pSize : Int64);
var zwSize1,zwSize2 : Int64;
begin
  IF fReadyToRead.WaitFor(INFINITE) = wrSignaled THEN
  BEGIN
    fReadyToWrite.ResetEvent;
    try
      CS.Enter;
      Diskrepanz:=Writer+Groesse*Richtung - Reader;
      IF (Diskrepanz-pSize) > 0 THEN
      BEGIN
        IF Reader+pSize <= Groesse THEN
        BEGIN
          copyMemory(pData,POINTER(Int64(Speicher)+Reader),pSize);
          Inc(Reader,pSize);
          IF Reader=Groesse THEN
          BEGIN
            Reader:=0;
            Richtung:=0;
          END;
        END
        ELSE BEGIN
          zwSize1:=Groesse-Reader;
          zwSize2:=Reader+pSize-Groesse;
          copyMemory(pData,POINTER(Int64(Speicher)+Reader),zwSize1);
          copyMemory(POINTER(Int64(pData)+zwSize1),Speicher,zwSize2);
          Reader:=zwSize2;
          Richtung:=0;
        END;
      END;
    finally
      CS.Leave;
      fReadyToWrite.SetEvent;
    END;
  END;
end;
procedure TForm2.TRingbuffer.PutData(pData : Pointer; pSize : Int64);
var zwSize1, zwSize2 : Int64;
begin
  if fReadyToWrite.WaitFor(INFINITE)=wrSignaled THEN
  BEGIN
    fReadyToRead.ResetEvent;
    try
      CS.Enter;
      Diskrepanz:=Writer+Groesse*Richtung - Reader;
      IF Diskrepanz < Groesse THEN
      BEGIN
        IF Writer + pSize <= Groesse THEN
        BEGIN
          CopyMemory(POINTER(Int64(Speicher)+Writer),pData,pSize);
          Inc(Writer,pSize);
          IF Writer=Groesse THEN
          BEGIN
            Writer:=0;
            Richtung:=1;
          END;
        END
        ELSE BEGIN
          zwSize1:=Groesse-Writer;
          zwSize2:=Writer+pSize-Groesse;
          CopyMemory(POINTER(Int64(Speicher)+Writer),pData,zwSize1);
          CopyMemory(Speicher,POINTER(INTEGER(pData)+zwsize1),zwSize2);
          Writer:=zwsize2;
          Richtung:=1;
        END;
      END;
    finally
      CS.Leave;
      fReadyToRead.SetEvent;
    end;
  END;
end;
procedure TForm2.TChangeThread.ExchangeFile;
begin
  Form2.StreamThread.FExchangeFile:=true;
end;
procedure TForm2.TChangeThread.PrepareFile;
BEGIN
  Inc(Form2.FileCount,1);
  Form2.Datei[Form2.FileCount MOD 
2]:=TFileStream.Create(Form2.rootname+format(fmstring,[Form2.FileCount]),fmCreate);
END;
procedure TForm2.TStreamThread.SetLabel;
begin
  Form2.StatusLA.Caption:='StreamThread';
end;
procedure TForm2.TChangeThread.Execute;
begin
  FreeOnTerminate:=true;
  if Mode=ctmPrepare then
  BEGIN
    Synchronize(PrepareFile);
  END;
  IF Mode=ctmExchange THEN
  BEGIN
    Synchronize(ExchangeFile);
  END;
end;
procedure TForm2.TStreamThread.CalculateDiscrepancy;
begin
  FPos^.Diskrepanz:=(FPos^.Buffersize * FPos^.Richtung + FPos^.CurrentWPos) - 
FPos^.CurrentRPos;
end;
procedure TForm2.TStreamThread.WriteMemToFile;
begin
 //Code
  IF (FAddrDatei <> nil) THEN
  BEGIN
    TFileStream(FAddrDatei^).Write(BlockMem^,FPos^.Blocksize);
    Inc(FPos^.CurrentRPos,FPos^.BlockSize);
    Inc(Form2.WrittenSize,FPos^.BlockSize);
  END
  ELSE
    Form2.StatusLA.Caption:='Error';
end;
procedure TForm2.TStreamThread.ResetPosition;
begin
 //Code
  FPos^.CurrentRPos:=0;
  FPos^.Richtung:=0;
end;

procedure TForm2.TStreamThread.Execute;
var zwPointer : Pointer; zwSize : Int64;
    doneit : Boolean; dummymem : String;
begin
 GetMem(BlockMem,FPos^.BlockSize);
 fExchangeFile:=false;
 WHILE not terminated DO
  BEGIN
    if FExchangeFile THEN
    BEGIN
      Form2.AddrDatei:[EMAIL PROTECTED] MOD 2];
      FreeAndNil(Form2.Datei[(Form2.FileCount-1) MOD 2]);
      FExchangeFile:=False;
    END;
    FAddrDatei:=POINTER(FAddrLink^);

    IF {(FPos^.Diskrepanz>=FPos^.Blocksize) AND} 
(assigned(TFileStream(FAddrDatei^))) AND (Form2.WrittenSize < Form2.Wholesize) 
THEN
    BEGIN
      IF (FPos^.CurrentRPos + FPos^.BlockSize <= FPos^.Buffersize) THEN
      BEGIN
        
CopyMemory(BlockMem,POINTER(FPos^.StartPos+FPos^.CurrentRPos),FPos^.BlockSize);
        Synchronize(WriteMemToFile);
      END; //if fpos^.currentrpos+fpos^.blocksize <= fpos^.buffersize
      IF (FPos^.CurrentRPos >= FPos^.Buffersize) THEN
      BEGIN
        Synchronize(ResetPosition);
      END; // if fpos^.currentrpos >= fpos^.buffersize
    END; // if Diskrepanz>0
  END; //while not terminated
  dispose(BlockMem);
end;

procedure TForm2.WriteToMem;
begin
//Code
end;
procedure TForm2.DataAvailable(Sender : TObject; Error : Word);
var receive, dest,zw : Pointer; rcvsize : Int64;
    zwsize : Int64;
begin
//  StreamThread.Suspend;
  rcvsize:=Socket.RcvdCount;
  IF rcvsize <> Pos^.Blocksize THEN
    Label5.Caption:=inttostr(rcvsize);
  GetMem(receive,rcvsize);
  Socket.Receive(receive,rcvsize);
  IF (rcvsize > 0) THEN
  BEGIN
    Inc(CurrentFilesize,rcvsize);
    Inc(WholeSize,rcvsize);
    Inc(ReceivedPacks,rcvsize DIV Pos^.Blocksize);
    IF (rcvsize MOD Pos^.Blocksize <> 0) THEN
      Inc(ErrorPacks);
    IF ((Pos^.CurrentWPos+rcvsize) <= Pos^.Buffersize) THEN
    BEGIN
      dest:=POINTER(Pos^.StartPos+Pos^.CurrentWPos);
      CritSec.Enter;
        CopyMemory(dest,receive,rcvsize);
      CritSec.Leave;
    END
    ELSE BEGIN
      zwsize:=Pos^.Buffersize - Pos^.CurrentWPos;
      dest:=POINTER(Pos^.StartPos+Pos^.CurrentWPos);
      CritSec.Enter;
        CopyMemory(dest,receive,zwsize);
      CritSec.Leave;
      zw:=POINTER(Int64(receive)+zwsize);
      dest:=POINTER(Pos^.StartPos);
      zwsize:=(Pos^.CurrentWPos+rcvsize)-Pos^.Buffersize;
      CritSec.Enter;
        CopyMemory(dest,zw,zwsize);
        Pos^.CurrentWPos:=zwsize;
        Pos^.Richtung:=1;
      CritSec.Leave;
    END;
    IF Pos^.CurrentWPos+rcvsize > Pos^.Buffersize THEN
    BEGIN
      CritSec.Enter;
        Pos^.CurrentWPos:=0;
        Pos^.Richtung:=1;
      CritSec.Leave;
      Label5.Caption:='Error on size:'+inttostr(rcvsize);
    END
    ELSE BEGIN
      CritSec.Enter;
        Inc(Pos^.CurrentWPos,rcvsize);
      CritSec.Leave;
    END;
  END;
//  StreamThread.Resume;
  IF (CurrentFileSize >= (MaxSize-PreRunSize)) AND (DoExchange = 0) THEN
  BEGIN
    DoExchange:=1;
    ChangeThread:=TChangeThread.Create(true);
    ChangeThread.Mode:=ctmPrepare;
    ChangeThread.Resume;
  END;
  IF (CurrentFileSize >= MaxSize) AND (DoExchange=1) THEN
  BEGIN
    DoExchange:=0;
    CurrentFileSize:=0;
    ExChangeThread:=TChangeThread.Create(true);
    ExChangeThread.Mode:=ctmExchange;
    ExChangeThread.Resume;
  END;
  dispose(receive);
  Label1.Caption:=Inttostr(round((Pos^.CurrentWPos/Pos^.Buffersize)*100));
  Label2.Caption:=Inttostr(round((Pos^.CurrentRPos/Pos^.Buffersize)*100));
  Label3.Caption:=Inttostr(Pos^.Diskrepanz);
  Label4.Caption:=Inttostr(CurrentFileSize);
  Label5.Caption:=Inttostr(FileCount+1);
  Label11.Caption:=Inttostr(WholeSize);
  Label12.Caption:=Inttostr(WrittenSize);
  Label17.Caption:=Inttostr(receivedPacks);
  Label19.Caption:=Inttostr(ErrorPacks);
end;
procedure TForm2.Button2Click(Sender: TObject);
begin
  new(Pos);
  Pos^.Buffersize:=StrToInt(BuffED.Text);
  Pos^.BlockSize:=StrToInt(BlockED.Text);
  GetMem(DataRes,Pos^.Buffersize);
  Pos^.CurrentWPos:=0;
  Pos^.CurrentRPos:=0;
  Pos^.StartPos:=Int64(DataRes);
  Pos^.Richtung:=0;
  Pos^.Diskrepanz:=0;
  FileCount:=0;
  MaxSize:=strtoint(MAXED.Text);
  Prerunsize:=500000;
  wholesize:=0;
  CurrentFileSize:=0;
  WrittenSize:=0;
  ReceivedPacks:=0;
  ErrorPacks:=0;
end;
procedure TForm2.Button1Click(Sender: TObject);
var TestPointer : Pointer;
begin
  WholeSize:=0;
  CurrentFileSize:=0;
  MyVar:=0;
  Socket:=TWSocket.Create(self);
  Socket.Addr:='0.0.0.0';
  Socket.MultiCastAddrStr:=IPED.Text;
  Socket.Port:=PortED.Text;
  Socket.Proto:='udp';
  Socket.MultiCast:=true;
  Socket.ReuseAddr:=true;
  Socket.MultiThreaded:=true;
  Socket.OnDataAvailable:=DataAvailable;

  CritSec:=TCriticalSection.Create;
  CreateDir(DIRED.Text);
  
Rootname:=DIRED.Text+'\'+FILED.Text+format(fmstring,[recordingid])+format(fmstring,[streamid]);
  Datei[0]:=TFileStream.Create(Rootname+format(fmstring,[FileCount]),fmCreate);
  AddrDatei:[EMAIL PROTECTED];
  AddrLink:[EMAIL PROTECTED];

  StreamThread:=TStreamThread.Create(true);
  StreamThread.FDatenspeicher:=DataRes;
  StreamThread.FPos:=Pointer(Int64(Pos));
  StreamThread.FAusgabe:=StatusLA;
  StreamThread.FAddrLink:=AddrLink;
  StreamThread.FreeOnTerminate:=true;

  StreamThread.Ffinalizethread:=false;

  Socket.Listen;
  StreamThread.Resume;
end;
end.
-- 
  Christian Hinske
  Schleißheimerstraße 157
  D-80797 München
  Tel.  :(++ 49 89) 36 0 37 445
  Mobil :(++49)176 234 10 146
  e-mail: [EMAIL PROTECTED]

Der GMX SmartSurfer hilft bis zu 70% Ihrer Onlinekosten zu sparen! 
Ideal für Modem und ISDN: http://www.gmx.net/de/go/smartsurfer
-- 
To unsubscribe or change your settings for TWSocket mailing list
please goto http://www.elists.org/mailman/listinfo/twsocket
Visit our website at http://www.overbyte.be

Reply via email to