Author: alanmc
Date: 2007-03-17 11:40:52 -0500 (Sat, 17 Mar 2007)
New Revision: 74530
Modified:
trunk/bitsharp/src/MonoTorrent.Client/Managers/FileManager.cs
Log:
Commented and updated to make it look nicer
Modified: trunk/bitsharp/src/MonoTorrent.Client/Managers/FileManager.cs
===================================================================
--- trunk/bitsharp/src/MonoTorrent.Client/Managers/FileManager.cs
2007-03-17 16:25:21 UTC (rev 74529)
+++ trunk/bitsharp/src/MonoTorrent.Client/Managers/FileManager.cs
2007-03-17 16:40:52 UTC (rev 74530)
@@ -45,17 +45,22 @@
{
#region Private Members
- TorrentFile[] files;
- string baseDirectory;
- string savePath;
- private readonly int pieceLength;
- private long fileSize;
- private SHA1Managed hasher;
- private FileStream[] fileStreams;
- private bool initialHashRequired;
- private Thread ioThread;
- private bool ioActive;
- private ManualResetEvent threadWait;
+ TorrentFile[] files; // The files
that are in the torrent that we have to downoad
+ string baseDirectory; // The base
directory into which all the files will be put
+ string savePath; // The path
where the base directory will be put
+ private readonly int pieceLength; // The length
of a piece in the torrent
+ private long fileSize; // The
combined length of all the files
+ private SHA1Managed hasher; // The SHA1
hasher used to calculate the hash of a piece
+ private byte[] hashBuffer; // This is
used to buffer a piec in memory so it can be hashed
+ private FileStream[] fileStreams; // The
filestreams used to read/write to the files on disk
+ private bool initialHashRequired; // Used to
indicate whether we need to hashcheck the files or not
+ private Thread ioThread; // The
dedicated thread used for reading/writing
+ private bool ioActive; // Used to
signal when the IO thread is running
+ private ManualResetEvent threadWait; // Used to
signal the IO thread when some data is ready for it to work on
+
+ private object bufferedIoLock; // Used to
synchronise access on the IO thread
+ private Queue<BufferedFileWrite> bufferedWrites; // A list of
all the writes which are waiting to be performed
+ private Queue<BufferedFileWrite> bufferedReads; // A list of
all the reads which are waiting to be performed
#endregion
@@ -99,6 +104,7 @@
#region Constructors
+
/// <summary>
/// Creates a new FileManager with read-only access
/// </summary>
@@ -149,22 +155,120 @@
else
this.baseDirectory = baseDirectory;
+ this.bufferedIoLock = new object();
+ this.bufferedReads = new Queue<BufferedFileWrite>();
+ this.bufferedWrites = new Queue<BufferedFileWrite>();
this.files = files;
- this.savePath = savePath;
- this.initialHashRequired = false;
- this.pieceLength = pieceLength;
this.hasher = new SHA1Managed();
this.hashBuffer = new byte[pieceLength];
-
- if (files.Length == 1)
- baseDirectory = string.Empty;
-
+ this.initialHashRequired = false;
this.ioActive = true;
+ this.pieceLength = pieceLength;
+ this.savePath = savePath;
this.threadWait = new ManualResetEvent(false);
}
+
#endregion
+ #region Methods
+
+ /// <summary>
+ /// Closes all the filestreams
+ /// </summary>
+ internal void CloseFileStreams()
+ {
+ for (int i = 0; i < this.fileStreams.Length; i++)
+ this.fileStreams[i].Dispose();
+
+ this.fileStreams = null;
+
+ // Setting this boolean true allows the IO thread to terminate
gracefully
+ this.ioActive = false;
+
+ // Allow the IO thread to run.
+ SetHandleState(true);
+ }
+
+
+ /// <summary>
+ /// Disposes all necessary objects
+ /// </summary>
+ public void Dispose()
+ {
+ this.Dispose(true);
+ }
+
+
+ /// <summary>
+ /// Disposes all necessary objects
+ /// </summary>
+ public void Dispose(bool disposing)
+ {
+ hasher.Clear();
+ if (this.StreamsOpen)
+ CloseFileStreams();
+ }
+
+
+ /// <summary>
+ /// Flushes all data in the FileStreams to disk
+ /// </summary>
+ internal void FlushAll()
+ {
+ foreach (FileStream stream in this.fileStreams)
+ lock (stream)
+ stream.Flush();
+ }
+
+
+ /// <summary>
+ /// Generates the full path to the supplied TorrentFile
+ /// </summary>
+ /// <param name="file">The TorrentFile to generate the full path
to</param>
+ /// <param name="baseDirectory">The name of the directory that the
files are contained in</param>
+ /// <param name="savePath">The path to the directory that contains the
BaseDirectory</param>
+ /// <returns>The full path to the TorrentFile</returns>
+ private string GenerateFilePath(TorrentFile file, string
baseDirectory, string savePath)
+ {
+ string path = string.Empty;
+
+ path = Path.Combine(savePath, baseDirectory);
+ path = Path.Combine(path, file.Path);
+
+ if (!Directory.Exists(Path.GetDirectoryName(path)))
+ Directory.CreateDirectory(Path.GetDirectoryName(path));
+
+ return path;
+ }
+
+
+ /// <summary>
+ /// Generates the hash for the given piece
+ /// </summary>
+ /// <param name="pieceIndex">The piece to generate the hash for</param>
+ /// <returns>The 20 byte SHA1 hash of the supplied piece</returns>
+ internal byte[] GetHash(int pieceIndex)
+ {
+ lock (this.hashBuffer)
+ {
+ // Calculate the start index of the piece
+ long pieceStartIndex = (long)this.pieceLength * pieceIndex;
+
+ // Read in the entire piece
+ int bytesRead = this.Read(this.hashBuffer, 0, pieceStartIndex,
(int)(this.fileSize - pieceStartIndex > this.pieceLength ? this.pieceLength :
this.fileSize - pieceStartIndex));
+
+
+ // Compute the hash of the piece
+ return hasher.ComputeHash(this.hashBuffer, 0, bytesRead);
+ }
+ }
+
+
+ /// <summary>
+ /// Opens all the filestreams with the specified file access
+ /// </summary>
+ /// <param name="fileAccess"></param>
internal void OpenFileStreams(FileAccess fileAccess)
{
string filePath = null;
@@ -194,43 +298,154 @@
this.ioThread.Start();
}
- internal void CloseFileStreams()
+
+ /// <summary>
+ /// Performs the buffered write
+ /// </summary>
+ /// <param name="bufferedFileIO"></param>
+ private void PerformWrite(BufferedFileWrite bufferedFileIO)
{
- for (int i = 0; i < this.fileStreams.Length; i++)
- this.fileStreams[i].Dispose();
+ PeerConnectionID id = bufferedFileIO.Id;
+ byte[] recieveBuffer = bufferedFileIO.Buffer;
+ PieceMessage message = (PieceMessage)bufferedFileIO.Message;
+ Piece piece = bufferedFileIO.Piece;
- this.fileStreams = null;
+ // Calculate the index where we will start to write the data
+ long writeIndex = (long)message.PieceIndex * message.PieceLength +
message.StartOffset;
- // Setting this boolean true allows the IO thread to terminate
gracefully
- this.ioActive = false;
+ // Perform the actual write
+ this.Write(recieveBuffer, message.DataOffset, writeIndex,
message.BlockLength);
- // Allow the IO thread to run.
- SetHandleState(true);
+ // Find the block that this data belongs to and set it's state to
"Written"
+ Block block = PiecePickerBase.GetBlockFromIndex(piece,
message.StartOffset, message.BlockLength);
+ block.Written = true;
+
+ // Release the buffer back into the buffer manager.
+ ClientEngine.BufferManager.FreeBuffer(ref bufferedFileIO.Buffer);
+
+ // If we haven't written all the pieces to disk, there's no point
in hash checking
+ if (!piece.AllBlocksWritten)
+ return;
+
+ // Hashcheck the piece as we now have all the blocks.
+ bool result =
ToolBox.ByteMatch(id.TorrentManager.Torrent.Pieces[piece.Index],
id.TorrentManager.FileManager.GetHash(piece.Index));
+ id.TorrentManager.Bitfield[message.PieceIndex] = result;
+
+ id.TorrentManager.HashedPiece(new
PieceHashedEventArgs(piece.Index, result));
+
+ // If the piece was successfully hashed, enqueue a new "have"
message to be sent out
+ // Otherwise increment that peers HashFails.
+ if (result)
+ lock (id.TorrentManager.finishedPieces)
+ id.TorrentManager.finishedPieces.Enqueue(piece.Index);
+ else
+ id.Peer.HashFails++;
}
- #region Methods
+
/// <summary>
- /// Generates the full path to the supplied TorrentFile
+ /// Performs the buffered read
/// </summary>
- /// <param name="file">The TorrentFile to generate the full path
to</param>
- /// <param name="baseDirectory">The name of the directory that the
files are contained in</param>
- /// <param name="savePath">The path to the directory that contains the
BaseDirectory</param>
- /// <returns>The full path to the TorrentFile</returns>
- private string GenerateFilePath(TorrentFile file, string
baseDirectory, string savePath)
+ /// <param name="bufferedFileIO"></param>
+ private void PerformRead(BufferedFileWrite bufferedFileIO)
{
- string path = string.Empty;
+ throw new Exception("The method or operation is not implemented.");
+ }
- path = Path.Combine(savePath, baseDirectory);
- path = Path.Combine(path, file.Path);
- if (!Directory.Exists(Path.GetDirectoryName(path)))
- Directory.CreateDirectory(Path.GetDirectoryName(path));
+ /// <summary>
+ /// Queues a block of data to be written asynchronously
+ /// </summary>
+ /// <param name="id">The peer who sent the block</param>
+ /// <param name="recieveBuffer">The array containing the block</param>
+ /// <param name="message">The PieceMessage</param>
+ /// <param name="piece">The piece that the block to be written is part
of</param>
+ internal void QueueWrite(PeerConnectionID id, byte[] recieveBuffer,
PieceMessage message, Piece piece)
+ {
+ lock (this.bufferedIoLock)
+ {
+ // Request a new buffer from the buffermanager and copy the
data from the receive buffer
+ // into this new buffer. This is needed as the main code will
automatically release the receive buffer
+ // and we will lose the data.
+ byte[] buffer = BufferManager.EmptyBuffer;
+ ClientEngine.BufferManager.GetBuffer(ref buffer,
BufferType.LargeMessageBuffer);
+ Buffer.BlockCopy(recieveBuffer, 0, buffer, 0,
recieveBuffer.Length);
- return path;
+ bufferedWrites.Enqueue(new BufferedFileWrite(id, buffer,
message, piece, id.TorrentManager.Bitfield));
+ SetHandleState(true);
+ }
}
/// <summary>
+ /// Queues a read request to be completed asynchronously
+ /// </summary>
+ /// <param name="id">The peer which the write request is for</param>
+ /// <param name="recieveBuffer">The buffer to read the data
into</param>
+ /// <param name="message">The RequestMessage</param>
+ /// <param name="piece"></param>
+ internal void QueueRead(PeerConnectionID id, byte[] recieveBuffer,
RequestMessage message, Piece piece)
+ {
+ lock (this.bufferedIoLock)
+ {
+ this.bufferedReads.Enqueue(new BufferedFileWrite(id,
recieveBuffer, message, piece, id.TorrentManager.Bitfield));
+ SetHandleState(true);
+ }
+ }
+
+
+ /// <summary>
+ /// This method runs in a dedicated thread. It performs all the async
reads and writes as they are queued
+ /// </summary>
+ private void RunIO()
+ {
+ BufferedFileWrite write;
+ BufferedFileWrite read;
+ while (ioActive)
+ {
+ write = null;
+ read = null;
+
+ // Take a lock on the iolock and dequeue any reads/writes that
are available. Then lose the lock before
+ // performing the actual read/write
+ lock (this.bufferedIoLock)
+ {
+ if (this.bufferedWrites.Count > 0)
+ write = this.bufferedWrites.Dequeue();
+
+ if (this.bufferedReads.Count > 0)
+ read = this.bufferedReads.Dequeue();
+
+ // If there are no more reads available and no more writes
available, set the handle to wait
+ if (this.bufferedReads.Count == 0 &&
this.bufferedWrites.Count == 0)
+ SetHandleState(false);
+ }
+
+ if (write != null)
+ PerformWrite(write);
+
+ if (read != null)
+ PerformRead(read);
+
+ this.threadWait.WaitOne();
+ }
+ }
+
+
+ /// <summary>
+ /// Sets the wait handle to Signaled (true) or Non-Signaled(false)
+ /// </summary>
+ /// <param name="set"></param>
+ private void SetHandleState(bool set)
+ {
+ if (set)
+ this.threadWait.Set();
+ else
+ this.threadWait.Reset();
+ }
+
+
+ /// <summary>
/// This method reads 'count' number of bytes from the filestream
starting at index 'offset'.
/// The bytes are read into the buffer starting at index
'bufferOffset'.
/// </summary>
@@ -317,9 +532,14 @@
fileStreams[i].Seek(offset, SeekOrigin.Begin);
offset = 0; // Any further files need to be written from
the beginning of the file
+ // Find the maximum number of bytes we can write before we
reach the end of the file
bytesWeCanWrite = this.fileStreams[i].Length -
this.fileStreams[i].Position;
- bytesWritten = (bytesWeCanWrite < (count - totalWritten))
? bytesWeCanWrite : (count - totalWritten);
+ // If the amount of data we are going to write is larger
than the amount we can write, just write the allowed
+ // amount and let the rest of the data be written with the
next filestream
+ bytesWritten = ((count - totalWritten) > bytesWeCanWrite)
? bytesWeCanWrite : (count - totalWritten);
+
+ // Write the data
this.fileStreams[i].Write(buffer, bufferOffset +
(int)totalWritten, (int)bytesWritten);
totalWritten += bytesWritten;
@@ -328,156 +548,6 @@
}
}
-
- /// <summary>
- /// Flushes all data in the FileStreams to disk
- /// </summary>
- internal void FlushAll()
- {
- foreach (FileStream stream in this.fileStreams)
- lock (stream)
- stream.Flush();
- }
-
-
- private byte[] hashBuffer;
- /// <summary>
- /// Generates the hash for the given piece
- /// </summary>
- /// <param name="pieceIndex">The piece to generate the hash for</param>
- /// <returns>The 20 byte SHA1 hash of the supplied piece</returns>
- internal byte[] GetHash(int pieceIndex)
- {
- lock (this.hashBuffer)
- {
- long pieceStartIndex = (long)this.pieceLength * pieceIndex;
- int bytesRead = this.Read(this.hashBuffer, 0, pieceStartIndex,
(int)(this.fileSize - pieceStartIndex > this.pieceLength ? this.pieceLength :
this.fileSize - pieceStartIndex));
- return hasher.ComputeHash(this.hashBuffer, 0, bytesRead);
- }
- }
-
-
- /// <summary>
- /// Disposes all necessary objects
- /// </summary>
- public void Dispose()
- {
- this.Dispose(true);
- }
-
-
- /// <summary>
- /// Disposes all necessary objects
- /// </summary>
- public void Dispose(bool disposing)
- {
- hasher.Clear();
- if (this.StreamsOpen)
- CloseFileStreams();
- }
#endregion
-
- private object bufferedIoLock = new object();
- private Queue<BufferedFileWrite> bufferedWrites = new
Queue<BufferedFileWrite>();
- private Queue<BufferedFileWrite> bufferedReads = new
Queue<BufferedFileWrite>();
-
- internal void QueueWrite(PeerConnectionID id, byte[] recieveBuffer,
PieceMessage message, Piece piece)
- {
- lock (this.bufferedIoLock)
- {
- byte[] buffer = BufferManager.EmptyBuffer;
- ClientEngine.BufferManager.GetBuffer(ref buffer,
BufferType.LargeMessageBuffer);
- Buffer.BlockCopy(recieveBuffer, 0, buffer, 0,
recieveBuffer.Length);
-
- bufferedWrites.Enqueue(new BufferedFileWrite(id, buffer,
message, piece, id.TorrentManager.Bitfield));
- SetHandleState(true);
- }
- }
-
- internal void QueueRead(PeerConnectionID id, byte[] recieveBuffer,
RequestMessage message, Piece piece)
- {
- lock (this.bufferedIoLock)
- {
- this.bufferedReads.Enqueue(new BufferedFileWrite(id,
recieveBuffer, message, piece, id.TorrentManager.Bitfield));
- SetHandleState(true);
- }
- }
-
- private void RunIO()
- {
- BufferedFileWrite write;
- BufferedFileWrite read;
- while (ioActive)
- {
- write = null;
- read = null;
- lock (this.bufferedIoLock)
- {
- if (this.bufferedWrites.Count > 0)
- write = this.bufferedWrites.Dequeue();
-
- if (this.bufferedReads.Count > 0)
- read = this.bufferedReads.Dequeue();
-
- if (this.bufferedReads.Count == 0 &&
this.bufferedWrites.Count == 0)
- SetHandleState(false);
- }
-
- if (write != null)
- PerformWrite(write);
-
- if (read != null)
- PerformRead(read);
-
- this.threadWait.WaitOne();
- }
- }
-
- private void PerformWrite(BufferedFileWrite bufferedFileIO)
- {
- PeerConnectionID id = bufferedFileIO.Id;
- byte[] recieveBuffer = bufferedFileIO.Buffer;
- PieceMessage message = (PieceMessage)bufferedFileIO.Message;
- Piece piece = bufferedFileIO.Piece;
-
- long writeIndex = (long)message.PieceIndex * message.PieceLength +
message.StartOffset;
- this.Write(recieveBuffer, message.DataOffset, writeIndex,
message.BlockLength);
- for (int i = 0; i < piece.Blocks.Length; i++)
- {
- if (piece[i].StartOffset != message.StartOffset)
- continue;
-
- piece[i].Written = true;
- break;
- }
- ClientEngine.BufferManager.FreeBuffer(ref bufferedFileIO.Buffer);
-
- if (!piece.AllBlocksWritten)
- return;
-
- bool result =
ToolBox.ByteMatch(id.TorrentManager.Torrent.Pieces[piece.Index],
id.TorrentManager.FileManager.GetHash(piece.Index));
- bufferedFileIO.BitField[message.PieceIndex] = result;
-
- id.TorrentManager.HashedPiece(new
PieceHashedEventArgs(piece.Index, result));
-
- if (result)
- lock (id.TorrentManager.finishedPieces)
- id.TorrentManager.finishedPieces.Enqueue(piece.Index);
- else
- id.Peer.HashFails++;
- }
-
- private void PerformRead(BufferedFileWrite bufferedFileIO)
- {
- throw new Exception("The method or operation is not implemented.");
- }
-
- private void SetHandleState(bool set)
- {
- if (set)
- this.threadWait.Set();
- else
- this.threadWait.Reset();
- }
}
}
\ No newline at end of file
_______________________________________________
Mono-patches maillist - [email protected]
http://lists.ximian.com/mailman/listinfo/mono-patches