Made code in OperationQueue more readable; added new class PartialStream which wraps a segment of a larger stream as if it was a separate stream, allowing access to certain regions of a stream to be prohibited and headers of a stream to be cut off without requiring a lengthy copy operation

git-svn-id: file:///srv/devel/repo-conversion/nusu@134 d2e56fa2-650e-0410-a79f-9358c0239efd
This commit is contained in:
Markus Ewald 2009-05-05 19:31:05 +00:00
parent 1c317b3f66
commit 7e9e0e2bf1
6 changed files with 800 additions and 12 deletions

View File

@ -118,6 +118,10 @@
<Compile Include="Source\Collections\ReverseComparer.Test.cs"> <Compile Include="Source\Collections\ReverseComparer.Test.cs">
<DependentUpon>ReverseComparer.cs</DependentUpon> <DependentUpon>ReverseComparer.cs</DependentUpon>
</Compile> </Compile>
<Compile Include="Source\IO\PartialStream.cs" />
<Compile Include="Source\IO\PartialStream.Test.cs">
<DependentUpon>PartialStream.cs</DependentUpon>
</Compile>
<Compile Include="Source\IO\RingMemoryStream.cs" /> <Compile Include="Source\IO\RingMemoryStream.cs" />
<Compile Include="Source\IO\RingMemoryStream.Test.cs"> <Compile Include="Source\IO\RingMemoryStream.Test.cs">
<DependentUpon>RingMemoryStream.cs</DependentUpon> <DependentUpon>RingMemoryStream.cs</DependentUpon>

View File

@ -100,6 +100,10 @@
<Compile Include="Source\Collections\ReverseComparer.Test.cs"> <Compile Include="Source\Collections\ReverseComparer.Test.cs">
<DependentUpon>ReverseComparer.cs</DependentUpon> <DependentUpon>ReverseComparer.cs</DependentUpon>
</Compile> </Compile>
<Compile Include="Source\IO\PartialStream.cs" />
<Compile Include="Source\IO\PartialStream.Test.cs">
<DependentUpon>PartialStream.cs</DependentUpon>
</Compile>
<Compile Include="Source\IO\RingMemoryStream.cs" /> <Compile Include="Source\IO\RingMemoryStream.cs" />
<Compile Include="Source\IO\RingMemoryStream.Test.cs"> <Compile Include="Source\IO\RingMemoryStream.Test.cs">
<DependentUpon>RingMemoryStream.cs</DependentUpon> <DependentUpon>RingMemoryStream.cs</DependentUpon>

View File

@ -227,7 +227,7 @@ namespace Nuclex.Support.IO {
[Test] [Test]
public void TestPartitionedRead() { public void TestPartitionedRead() {
ChainStream chainer = chainTwoStreamsOfTenBytes(); ChainStream chainer = chainTwoStreamsOfTenBytes();
((MemoryStream)chainer.ChainedStreams[0]).Write( ((MemoryStream)chainer.ChainedStreams[0]).Write(
new byte[] { 1, 2, 3, 4, 5 }, 0, 5 new byte[] { 1, 2, 3, 4, 5 }, 0, 5
); );
@ -310,7 +310,7 @@ namespace Nuclex.Support.IO {
byte[] buffer = new byte[5]; byte[] buffer = new byte[5];
int readByteCount = chainer.Read(buffer, 0, 3); int readByteCount = chainer.Read(buffer, 0, 3);
Assert.AreEqual(3, readByteCount); Assert.AreEqual(3, readByteCount);
Assert.AreEqual(new byte[] { 0, 9, 8, 0, 0 }, buffer); Assert.AreEqual(new byte[] { 0, 9, 8, 0, 0 }, buffer);

View File

@ -0,0 +1,515 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2009 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
#if UNITTEST
using System;
using System.IO;
using NUnit.Framework;
namespace Nuclex.Support.IO {
/// <summary>Unit Test for the partial stream</summary>
[TestFixture]
public class PartialStreamTest {
#region class TestStream
/// <summary>Testing stream that allows specific features to be disabled</summary>
private class TestStream : Stream {
/// <summary>Initializes a new test stream</summary>
/// <param name="wrappedStream">Stream that will be wrapped</param>
/// <param name="allowRead">Whether to allow reading from the stream</param>
/// <param name="allowWrite">Whether to allow writing to the stream</param>
/// <param name="allowSeek">Whether to allow seeking within the stream</param>
public TestStream(
Stream wrappedStream, bool allowRead, bool allowWrite, bool allowSeek
) {
this.stream = wrappedStream;
this.readAllowed = allowRead;
this.writeAllowed = allowWrite;
this.seekAllowed = allowSeek;
}
/// <summary>Whether data can be read from the stream</summary>
public override bool CanRead {
get { return this.readAllowed; }
}
/// <summary>Whether the stream supports seeking</summary>
public override bool CanSeek {
get { return this.seekAllowed; }
}
/// <summary>Whether data can be written into the stream</summary>
public override bool CanWrite {
get { return this.writeAllowed; }
}
/// <summary>
/// Clears all buffers for this stream and causes any buffered data to be written
/// to the underlying device.
/// </summary>
public override void Flush() {
++this.flushCallCount;
this.stream.Flush();
}
/// <summary>Length of the stream in bytes</summary>
public override long Length {
get {
enforceSeekAllowed();
return this.stream.Length;
}
}
/// <summary>Absolute position of the file pointer within the stream</summary>
/// <exception cref="NotSupportedException">
/// At least one of the chained streams does not support seeking
/// </exception>
public override long Position {
get {
enforceSeekAllowed();
return this.stream.Position;
}
set {
enforceSeekAllowed();
this.stream.Position = value;
}
}
/// <summary>
/// Reads a sequence of bytes from the stream and advances the position of
/// the file pointer by the number of bytes read.
/// </summary>
/// <param name="buffer">Buffer that will receive the data read from the stream</param>
/// <param name="offset">
/// Offset in the buffer at which the stream will place the data read
/// </param>
/// <param name="count">Maximum number of bytes that will be read</param>
/// <returns>
/// The number of bytes that were actually read from the stream and written into
/// the provided buffer
/// </returns>
public override int Read(byte[] buffer, int offset, int count) {
enforceReadAllowed();
return this.stream.Read(buffer, offset, count);
}
/// <summary>Changes the position of the file pointer</summary>
/// <param name="offset">
/// Offset to move the file pointer by, relative to the position indicated by
/// the <paramref name="origin" /> parameter.
/// </param>
/// <param name="origin">
/// Reference point relative to which the file pointer is placed
/// </param>
/// <returns>The new absolute position within the stream</returns>
public override long Seek(long offset, SeekOrigin origin) {
enforceSeekAllowed();
return this.stream.Seek(offset, origin);
}
/// <summary>Changes the length of the stream</summary>
/// <param name="value">New length the stream shall have</param>
public override void SetLength(long value) {
enforceSeekAllowed();
this.stream.SetLength(value);
}
/// <summary>
/// Writes a sequence of bytes to the stream and advances the position of
/// the file pointer by the number of bytes written.
/// </summary>
/// <param name="buffer">
/// Buffer containing the data that will be written to the stream
/// </param>
/// <param name="offset">
/// Offset in the buffer at which the data to be written starts
/// </param>
/// <param name="count">Number of bytes that will be written into the stream</param>
public override void Write(byte[] buffer, int offset, int count) {
enforceWriteAllowed();
this.stream.Write(buffer, offset, count);
}
/// <summary>Number of times the Flush() method has been called</summary>
public int FlushCallCount {
get { return this.flushCallCount; }
}
/// <summary>Throws an exception if reading is not allowed</summary>
private void enforceReadAllowed() {
if(!this.readAllowed) {
throw new NotSupportedException("Reading has been disabled");
}
}
/// <summary>Throws an exception if writing is not allowed</summary>
private void enforceWriteAllowed() {
if(!this.writeAllowed) {
throw new NotSupportedException("Writing has been disabled");
}
}
/// <summary>Throws an exception if seeking is not allowed</summary>
private void enforceSeekAllowed() {
if(!this.seekAllowed) {
throw new NotSupportedException("Seeking has been disabled");
}
}
/// <summary>Stream being wrapped for testing</summary>
private Stream stream;
/// <summary>whether to allow reading from the wrapped stream</summary>
private bool readAllowed;
/// <summary>Whether to allow writing to the wrapped stream</summary>
private bool writeAllowed;
/// <summary>Whether to allow seeking within the wrapped stream</summary>
private bool seekAllowed;
/// <summary>Number of times the Flush() method has been called</summary>
private int flushCallCount;
}
#endregion // class TestStream
/// <summary>Tests whether the partial stream constructor is working</summary>
[Test]
public void TestConstructor() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(123);
PartialStream partialStream = new PartialStream(memoryStream, 23, 100);
Assert.AreEqual(100, partialStream.Length);
}
}
/// <summary>
/// Verifies that the partial stream constructor throws an exception if
/// it's invoked with an invalid start offset
/// </summary>
[Test, ExpectedException(typeof(ArgumentException))]
public void TestThrowOnInvalidStartInConstructor() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(123);
PartialStream partialStream = new PartialStream(memoryStream, -1, 10);
Assert.AreNotEqual(partialStream.Length, partialStream.Length);
}
}
/// <summary>
/// Verifies that the partial stream constructor throws an exception if
/// it's invoked with an invalid start offset
/// </summary>
[Test, ExpectedException(typeof(ArgumentException))]
public void TestThrowOnInvalidLengthInConstructor() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(123);
PartialStream partialStream = new PartialStream(memoryStream, 100, 24);
Assert.AreNotEqual(partialStream.Length, partialStream.Length);
}
}
/// <summary>
/// Verifies that the partial stream constructor throws an exception if
/// it's invoked with a start offset on an unseekable stream
/// </summary>
[Test, ExpectedException(typeof(ArgumentException))]
public void TestThrowOnUnseekableStreamWithOffsetInConstructor() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(123);
TestStream testStream = new TestStream(memoryStream, true, true, false);
PartialStream partialStream = new PartialStream(testStream, 23, 100);
Assert.AreNotEqual(partialStream.Length, partialStream.Length);
}
}
/// <summary>
/// Tests whether the CanRead property reports its status correctly
/// </summary>
[Test]
public void TestCanReadProperty() {
using(MemoryStream memoryStream = new MemoryStream()) {
TestStream yesStream = new TestStream(memoryStream, true, true, true);
TestStream noStream = new TestStream(memoryStream, false, true, true);
Assert.IsTrue(new PartialStream(yesStream, 0, 0).CanRead);
Assert.IsFalse(new PartialStream(noStream, 0, 0).CanRead);
}
}
/// <summary>
/// Tests whether the CanWrite property reports its status correctly
/// </summary>
[Test]
public void TestCanWriteProperty() {
using(MemoryStream memoryStream = new MemoryStream()) {
TestStream yesStream = new TestStream(memoryStream, true, true, true);
TestStream noStream = new TestStream(memoryStream, true, false, true);
Assert.IsTrue(new PartialStream(yesStream, 0, 0).CanWrite);
Assert.IsFalse(new PartialStream(noStream, 0, 0).CanWrite);
}
}
/// <summary>
/// Tests whether the CanSeek property reports its status correctly
/// </summary>
[Test]
public void TestCanSeekProperty() {
using(MemoryStream memoryStream = new MemoryStream()) {
TestStream yesStream = new TestStream(memoryStream, true, true, true);
TestStream noStream = new TestStream(memoryStream, true, true, false);
Assert.IsTrue(new PartialStream(yesStream, 0, 0).CanSeek);
Assert.IsFalse(new PartialStream(noStream, 0, 0).CanSeek);
}
}
/// <summary>
/// Tests whether the CompleteStream property returns the original stream
/// </summary>
[Test]
public void TestCompleteStreamProperty() {
using(MemoryStream memoryStream = new MemoryStream()) {
PartialStream partialStream = new PartialStream(memoryStream, 0, 0);
Assert.AreSame(memoryStream, partialStream.CompleteStream);
}
}
/// <summary>Tests whether the Flush() method can be called</summary>
[Test]
public void TestFlush() {
using(MemoryStream memoryStream = new MemoryStream()) {
PartialStream partialStream = new PartialStream(memoryStream, 0, 0);
partialStream.Flush();
}
}
/// <summary>
/// Tests whether the Position property correctly reports the file pointer position
/// </summary>
[Test]
public void TestGetPosition() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(123);
PartialStream partialStream = new PartialStream(memoryStream, 23, 100);
Assert.AreEqual(0, partialStream.Position);
byte[] test = new byte[10];
int bytesRead = partialStream.Read(test, 0, 10);
Assert.AreEqual(10, bytesRead);
Assert.AreEqual(10, partialStream.Position);
bytesRead = partialStream.Read(test, 0, 10);
Assert.AreEqual(10, bytesRead);
Assert.AreEqual(20, partialStream.Position);
}
}
/// <summary>
/// Tests whether the Position property is correctly updated
/// </summary>
[Test]
public void TestSetPosition() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(123);
PartialStream partialStream = new PartialStream(memoryStream, 23, 100);
Assert.AreEqual(0, partialStream.Position);
partialStream.Position = 7;
Assert.AreEqual(partialStream.Position, 7);
partialStream.Position = 14;
Assert.AreEqual(partialStream.Position, 14);
}
}
/// <summary>
/// Tests whether the Position property throws an exception if the stream does
/// not support seeking.
/// </summary>
[Test, ExpectedException(typeof(NotSupportedException))]
public void TestThrowOnGetPositionOnUnseekableStream() {
using(MemoryStream memoryStream = new MemoryStream()) {
TestStream testStream = new TestStream(memoryStream, true, true, false);
PartialStream partialStream = new PartialStream(testStream, 0, 0);
Assert.AreEqual(-12345, partialStream.Position);
}
}
/// <summary>
/// Tests whether the Position property throws an exception if the stream does
/// not support seeking.
/// </summary>
[Test, ExpectedException(typeof(NotSupportedException))]
public void TestThrowOnSetPositionOnUnseekableStream() {
using(MemoryStream memoryStream = new MemoryStream()) {
TestStream testStream = new TestStream(memoryStream, true, true, false);
PartialStream partialStream = new PartialStream(testStream, 0, 0);
partialStream.Position = 0;
}
}
/// <summary>
/// Tests whether the Read() method throws an exception if the stream does
/// not support reading
/// </summary>
[Test, ExpectedException(typeof(NotSupportedException))]
public void TestThrowOnReadFromUnreadableStream() {
using(MemoryStream memoryStream = new MemoryStream()) {
TestStream testStream = new TestStream(memoryStream, false, true, true);
PartialStream partialStream = new PartialStream(testStream, 0, 0);
byte[] test = new byte[10];
int bytesRead = partialStream.Read(test, 0, 10);
Assert.AreNotEqual(bytesRead, bytesRead);
}
}
/// <summary>
/// Tests whether the Seek() method of the partial stream is working
/// </summary>
[Test]
public void TestSeeking() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(20);
PartialStream partialStream = new PartialStream(memoryStream, 0, 20);
Assert.AreEqual(7, partialStream.Seek(-13, SeekOrigin.End));
Assert.AreEqual(14, partialStream.Seek(7, SeekOrigin.Current));
Assert.AreEqual(11, partialStream.Seek(11, SeekOrigin.Begin));
}
}
/// <summary>
/// Tests whether the Seek() method throws an exception if an invalid
/// reference point is provided
/// </summary>
[Test, ExpectedException(typeof(ArgumentException))]
public void TestThrowOnInvalidSeekReferencePoint() {
using(MemoryStream memoryStream = new MemoryStream()) {
PartialStream partialStream = new PartialStream(memoryStream, 0, 0);
partialStream.Seek(1, (SeekOrigin)12345);
}
}
/// <summary>
/// Verifies that the partial stream throws an exception if the attempt is
/// made to change the length of the stream
/// </summary>
[Test, ExpectedException(typeof(NotSupportedException))]
public void TestThrowOnLengthChange() {
using(MemoryStream memoryStream = new MemoryStream()) {
PartialStream partialStream = new PartialStream(memoryStream, 0, 0);
partialStream.SetLength(123);
}
}
/// <summary>
/// Tests whether the Read() method returns 0 bytes if the attempt is made
/// to read data from an invalid position
/// </summary>
[Test]
public void TestReadFromInvalidPosition() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(123);
memoryStream.Position = 1123;
byte[] test = new byte[10];
Assert.AreEqual(0, memoryStream.Read(test, 0, 10));
}
}
/// <summary>Verifies that the Read() method is working</summary>
[Test]
public void TestReadFromPartialStream() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(123);
memoryStream.Position = 100;
memoryStream.Write(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, 0, 10);
PartialStream partialStream = new PartialStream(memoryStream, 95, 10);
byte[] buffer = new byte[15];
int bytesRead = partialStream.Read(buffer, 0, 15);
Assert.AreEqual(10, bytesRead);
Assert.AreEqual(
new byte[] { 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 0, 0, 0, 0, 0 }, buffer
);
}
}
/// <summary>Verifies that the Write() method is working</summary>
[Test]
public void TestWriteToPartialStream() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(123);
memoryStream.Position = 60;
memoryStream.Write(new byte[] { 11, 12, 13, 14, 15 }, 0, 5);
PartialStream partialStream = new PartialStream(memoryStream, 50, 15);
partialStream.Position = 3;
partialStream.Write(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, 0, 10);
byte[] buffer = new byte[17];
memoryStream.Position = 49;
int bytesRead = memoryStream.Read(buffer, 0, 17);
Assert.AreEqual(17, bytesRead);
Assert.AreEqual(
new byte[] { 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 14, 15, 0 },
buffer
);
}
}
/// <summary>
/// Verifies that an exception is thrown if the Write() method of the partial stream
/// is attempted to be used to extend the partial stream's length
/// </summary>
[Test, ExpectedException(typeof(NotSupportedException))]
public void TestThrowOnExtendPartialStream() {
using(MemoryStream memoryStream = new MemoryStream()) {
memoryStream.SetLength(25);
PartialStream partialStream = new PartialStream(memoryStream, 10, 10);
partialStream.Position = 5;
partialStream.Write(new byte[] { 1, 2, 3, 4, 5, 6 }, 0, 6);
}
}
}
} // namespace Nuclex.Support.IO
#endif // UNITTEST

264
Source/IO/PartialStream.cs Normal file
View File

@ -0,0 +1,264 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2009 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
namespace Nuclex.Support.IO {
/// <summary>Wraps a stream and exposes only a limited region of its data</summary>
public class PartialStream : Stream {
/// <summary>Initializes a new partial stream</summary>
/// <param name="stream">
/// Stream the wrapper will make a limited region accessible of
/// </param>
/// <param name="start">
/// Start index in the stream which becomes the beginning for the wrapper
/// </param>
/// <param name="length">
/// Length the wrapped stream should report and allow access to
/// </param>
public PartialStream(Stream stream, long start, long length) {
if(start < 0) {
throw new ArgumentException("Start index must not be less than 0", "start");
}
if(!stream.CanSeek) {
if(start != 0) {
throw new ArgumentException(
"The only valid start for unseekable streams is 0", "start"
);
}
} else {
if(start + length > stream.Length) {
throw new ArgumentException(
"Partial stream exceeds end of full stream", "length"
);
}
}
this.stream = stream;
this.start = start;
this.length = length;
}
/// <summary>Whether data can be read from the stream</summary>
public override bool CanRead {
get { return this.stream.CanRead; }
}
/// <summary>Whether the stream supports seeking</summary>
public override bool CanSeek {
get { return this.stream.CanSeek; }
}
/// <summary>Whether data can be written into the stream</summary>
public override bool CanWrite {
get { return this.stream.CanWrite; }
}
/// <summary>
/// Clears all buffers for this stream and causes any buffered data to be written
/// to the underlying device.
/// </summary>
public override void Flush() {
this.stream.Flush();
}
/// <summary>Length of the stream in bytes</summary>
/// <exception cref="NotSupportedException">
/// The wrapped stream does not support seeking
/// </exception>
public override long Length {
get { return this.length; }
}
/// <summary>Absolute position of the file pointer within the stream</summary>
/// <exception cref="NotSupportedException">
/// The wrapped stream does not support seeking
/// </exception>
public override long Position {
get {
if(!this.stream.CanSeek) {
throw makeSeekNotSupportedException("seek");
}
return this.position;
}
set { moveFilePointer(value); }
}
/// <summary>
/// Reads a sequence of bytes from the stream and advances the position of
/// the file pointer by the number of bytes read.
/// </summary>
/// <param name="buffer">Buffer that will receive the data read from the stream</param>
/// <param name="offset">
/// Offset in the buffer at which the stream will place the data read
/// </param>
/// <param name="count">Maximum number of bytes that will be read</param>
/// <returns>
/// The number of bytes that were actually read from the stream and written into
/// the provided buffer
/// </returns>
/// <exception cref="NotSupportedException">
/// The wrapped stream does not support reading
/// </exception>
public override int Read(byte[] buffer, int offset, int count) {
if(!this.stream.CanRead) {
throw new NotSupportedException(
"Can't read: the wrapped stream doesn't support reading"
);
}
long remaining = this.length - this.position;
int bytesToRead = (int)Math.Min(count, remaining);
if(this.stream.CanSeek) {
this.stream.Position = this.position + this.start;
}
int bytesRead = this.stream.Read(buffer, offset, bytesToRead);
this.position += bytesRead;
return bytesRead;
}
/// <summary>Changes the position of the file pointer</summary>
/// <param name="offset">
/// Offset to move the file pointer by, relative to the position indicated by
/// the <paramref name="origin" /> parameter.
/// </param>
/// <param name="origin">
/// Reference point relative to which the file pointer is placed
/// </param>
/// <returns>The new absolute position within the stream</returns>
public override long Seek(long offset, SeekOrigin origin) {
switch(origin) {
case SeekOrigin.Begin: {
return Position = offset;
}
case SeekOrigin.Current: {
return Position += offset;
}
case SeekOrigin.End: {
return Position = (Length + offset);
}
default: {
throw new ArgumentException("Invalid seek origin", "origin");
}
}
}
/// <summary>Changes the length of the stream</summary>
/// <param name="value">New length the stream shall have</param>
/// <exception cref="NotSupportedException">
/// Always, the stream chainer does not support the SetLength() operation
/// </exception>
public override void SetLength(long value) {
throw new NotSupportedException("Resizing partial streams is not supported");
}
/// <summary>
/// Writes a sequence of bytes to the stream and advances the position of
/// the file pointer by the number of bytes written.
/// </summary>
/// <param name="buffer">
/// Buffer containing the data that will be written to the stream
/// </param>
/// <param name="offset">
/// Offset in the buffer at which the data to be written starts
/// </param>
/// <param name="count">Number of bytes that will be written into the stream</param>
/// <remarks>
/// The behavior of this method is as follows: If one or more chained streams
/// do not support seeking, all data is appended to the final stream in the
/// chain. Otherwise, writing will begin with the stream the current file pointer
/// offset falls into. If the end of that stream is reached, writing continues
/// in the next stream. On the last stream, writing more data into the stream
/// that it current size allows will enlarge the stream.
/// </remarks>
public override void Write(byte[] buffer, int offset, int count) {
long remaining = this.length - this.position;
if(count > remaining) {
throw new NotSupportedException(
"Cannot extend the length of the partial stream"
);
}
if(this.stream.CanSeek) {
this.stream.Position = this.position + this.start;
}
this.stream.Write(buffer, offset, count);
this.position += count;
}
/// <summary>Stream being wrapped by the partial stream wrapper</summary>
public Stream CompleteStream {
get { return this.stream; }
}
/// <summary>Moves the file pointer</summary>
/// <param name="position">New position the file pointer will be moved to</param>
private void moveFilePointer(long position) {
if(!this.stream.CanSeek) {
throw makeSeekNotSupportedException("seek");
}
// Seemingly, it is okay to move the file pointer beyond the end of
// the stream until you try to Read() or Write()
this.position = position;
}
/// <summary>
/// Constructs a NotSupportException for an error caused by the wrapped
/// stream having no seek support
/// </summary>
/// <param name="action">Action that was tried to perform</param>
/// <returns>The newly constructed NotSupportedException</returns>
private static NotSupportedException makeSeekNotSupportedException(string action) {
return new NotSupportedException(
string.Format(
"Can't {0}: the wrapped stream does not support seeking",
action
)
);
}
/// <summary>Streams that have been chained together</summary>
private Stream stream;
/// <summary>Start index of the partial stream in the wrapped stream</summary>
private long start;
/// <summary>Zero-based position of the partial stream's file pointer</summary>
/// <remarks>
/// If the stream does not support seeking, the position will simply be counted
/// up until it reaches <see cref="PartialStream.length" />.
/// </remarks>
private long position;
/// <summary>Length of the partial stream</summary>
private long length;
}
} // namespace Nuclex.Support.IO

View File

@ -119,10 +119,10 @@ namespace Nuclex.Support.Scheduling {
copy(this, eventArguments); copy(this, eventArguments);
} }
/// <summary>Prepares the current operation and calls its Begin() method</summary> /// <summary>Prepares the current operation and calls its Start() method</summary>
/// <remarks> /// <remarks>
/// This subscribes the queue to the events of to the current operation /// This subscribes the queue to the events of to the current operation
/// and launches the operation by calling its Begin() method. /// and launches the operation by calling its Start() method.
/// </remarks> /// </remarks>
private void startCurrentOperation() { private void startCurrentOperation() {
OperationType operation = this.children[this.currentOperationIndex].Transaction; OperationType operation = this.children[this.currentOperationIndex].Transaction;
@ -168,8 +168,8 @@ namespace Nuclex.Support.Scheduling {
/// <summary>Called when the current executing operation ends</summary> /// <summary>Called when the current executing operation ends</summary>
/// <param name="sender">Operation that ended</param> /// <param name="sender">Operation that ended</param>
/// <param name="e">Not used</param> /// <param name="arguments">Not used</param>
private void asyncOperationEnded(object sender, EventArgs e) { private void asyncOperationEnded(object sender, EventArgs arguments) {
// Unsubscribe from the current operation's events and update the // Unsubscribe from the current operation's events and update the
// accumulating progress counter // accumulating progress counter
@ -196,16 +196,17 @@ namespace Nuclex.Support.Scheduling {
/// <summary>Called when currently executing operation makes progress</summary> /// <summary>Called when currently executing operation makes progress</summary>
/// <param name="sender">Operation that has achieved progress</param> /// <param name="sender">Operation that has achieved progress</param>
/// <param name="e">Not used</param> /// <param name="arguments">Not used</param>
private void asyncOperationProgressChanged(object sender, ProgressReportEventArgs e) { private void asyncOperationProgressChanged(
object sender, ProgressReportEventArgs arguments
) {
// Determine the completed weight of the currently executing operation // Determine the completed weight of the currently executing operation
float currentOperationCompletedWeight = float operationWeight = this.children[this.currentOperationIndex].Weight;
e.Progress * this.children[this.currentOperationIndex].Weight; float operationCompletedWeight = arguments.Progress * operationWeight;
// Build the total normalized amount of progress for the queue // Build the total normalized amount of progress for the queue
float progress = float progress = (this.completedWeight + operationCompletedWeight) / this.totalWeight;
(this.completedWeight + currentOperationCompletedWeight) / this.totalWeight;
// Done, we can send the actual progress to any event subscribers // Done, we can send the actual progress to any event subscribers
OnAsyncProgressChanged(progress); OnAsyncProgressChanged(progress);