#region Apache License 2.0 /* Nuclex .NET Framework Copyright (C) 2002-2024 Markus Ewald / Nuclex Development Labs Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #endregion // Apache License 2.0 using System; using System.Diagnostics; using System.IO; namespace Nuclex.Support.IO { /// Chains a series of independent streams into a single stream /// /// /// This class can be used to chain multiple independent streams into a single /// stream that acts as if its chained streams were only one combined stream. /// It is useful to avoid creating huge memory streams or temporary files when /// you just need to prepend or append some data to a stream or if you need to /// read a file that was split into several parts as if it was a single file. /// /// /// It is not recommended to change the size of any chained stream after it /// has become part of a stream chainer, though the stream chainer will do its /// best to cope with the changes as they occur. Increasing the length of a /// chained stream is generally not an issue for streams that support seeking, /// but reducing the length might invalidate the stream chainer's file pointer, /// resulting in an IOException when Read() or Write() is next called. /// /// public class ChainStream : Stream { /// Initializes a new stream chainer /// Array of streams that will be chained together public ChainStream(params Stream[] streams) { this.streams = (Stream[])streams.Clone(); determineCapabilities(); } /// Whether data can be read from the stream public override bool CanRead { get { return this.allStreamsCanRead; } } /// Whether the stream supports seeking public override bool CanSeek { get { return this.allStreamsCanSeek; } } /// Whether data can be written into the stream public override bool CanWrite { get { return this.allStreamsCanWrite; } } /// /// Clears all buffers for this stream and causes any buffered data to be written /// to the underlying device. /// public override void Flush() { for(int index = 0; index < this.streams.Length; ++index) { this.streams[index].Flush(); } } /// Length of the stream in bytes /// /// At least one of the chained streams does not support seeking /// public override long Length { get { if(!this.allStreamsCanSeek) { throw makeSeekNotSupportedException("determine length"); } // Sum up the length of all chained streams long length = 0; for(int index = 0; index < this.streams.Length; ++index) { length += this.streams[index].Length; } return length; } } /// Absolute position of the file pointer within the stream /// /// At least one of the chained streams does not support seeking /// public override long Position { get { if(!this.allStreamsCanSeek) { throw makeSeekNotSupportedException("seek"); } return this.position; } set { moveFilePointer(value); } } /// /// Reads a sequence of bytes from the stream and advances the position of /// the file pointer by the number of bytes read. /// /// Buffer that will receive the data read from the stream /// /// Offset in the buffer at which the stream will place the data read /// /// Maximum number of bytes that will be read /// /// The number of bytes that were actually read from the stream and written into /// the provided buffer /// /// /// The chained stream at the current position does not support reading /// public override int Read(byte[] buffer, int offset, int count) { if(!this.allStreamsCanRead) { throw new NotSupportedException( "Can't read: at least one of the chained streams doesn't support reading" ); } int totalBytesRead = 0; int lastStreamIndex = this.streams.Length - 1; if(this.allStreamsCanSeek) { // Find out from which stream and at which position we need to begin reading int streamIndex; long streamOffset; findStreamIndexAndOffset(this.position, out streamIndex, out streamOffset); // Try to read from the stream our current file pointer falls into. If more // data was requested than the stream contains, read each stream to its end // until we either have enough data or run out of streams. while(count > 0) { Stream currentStream = this.streams[streamIndex]; // Read up to count bytes from the current stream. Count is decreased each // time we successfully get data and holds the number of bytes remaining // to be read long maximumBytes = Math.Min(count, currentStream.Length - streamOffset); currentStream.Position = streamOffset; int bytesRead = currentStream.Read(buffer, offset, (int)maximumBytes); // Accumulate the total number of bytes we read for the return value totalBytesRead += bytesRead; // If the stream returned partial data, stop here. Also, if this was the // last stream we queried, this is as far as we can go. if((bytesRead < maximumBytes) || (streamIndex == lastStreamIndex)) { break; } // Move on to the next stream in the chain ++streamIndex; streamOffset = 0; count -= bytesRead; offset += bytesRead; } this.position += totalBytesRead; } else { // Try to read from the active read stream. If the end of the active read // stream is reached, switch to the next stream in the chain until we have // no more streams left to read from while(this.activeReadStreamIndex <= lastStreamIndex) { // Try to read from the stream. The stream can either return any amount // of data > 0 if there's still data left ot be read or 0 if the end of // the stream was reached Stream activeStream = this.streams[this.activeReadStreamIndex]; if(activeStream.CanSeek) { activeStream.Position = this.activeReadStreamPosition; } totalBytesRead = activeStream.Read(buffer, offset, count); // If we got any data, we're done, exit the loop if(totalBytesRead != 0) { break; } else { // Otherwise, go to the next stream in the chain this.activeReadStreamPosition = 0; ++this.activeReadStreamIndex; } } this.activeReadStreamPosition += totalBytesRead; } return totalBytesRead; } /// Changes the position of the file pointer /// /// Offset to move the file pointer by, relative to the position indicated by /// the parameter. /// /// /// Reference point relative to which the file pointer is placed /// /// The new absolute position within the stream public override long Seek(long offset, SeekOrigin origin) { switch(origin) { case SeekOrigin.Begin: { return Position = offset; } case SeekOrigin.Current: { return Position += offset; } case SeekOrigin.End: { return Position = (Length + offset); } default: { throw new ArgumentException("Invalid seek origin", "origin"); } } } /// Changes the length of the stream /// New length the stream shall have /// /// Always, the stream chainer does not support the SetLength() operation /// public override void SetLength(long value) { throw new NotSupportedException("Resizing chained streams is not supported"); } /// /// Writes a sequence of bytes to the stream and advances the position of /// the file pointer by the number of bytes written. /// /// /// Buffer containing the data that will be written to the stream /// /// /// Offset in the buffer at which the data to be written starts /// /// Number of bytes that will be written into the stream /// /// The behavior of this method is as follows: If one or more chained streams /// do not support seeking, all data is appended to the final stream in the /// chain. Otherwise, writing will begin with the stream the current file pointer /// offset falls into. If the end of that stream is reached, writing continues /// in the next stream. On the last stream, writing more data into the stream /// that it current size allows will enlarge the stream. /// public override void Write(byte[] buffer, int offset, int count) { if(!this.allStreamsCanWrite) { throw new NotSupportedException( "Can't write: at least one of the chained streams doesn't support writing" ); } int remaining = count; // If seeking is supported, we can write into the mid of the stream, // if the user so desires if(this.allStreamsCanSeek) { // Find out in which stream and at which position we need to begin writing int streamIndex; long streamOffset; findStreamIndexAndOffset(this.position, out streamIndex, out streamOffset); // Write data into the streams, switching over to the next stream if data is // too large to fit into the current stream, until all data is spent. int lastStreamIndex = this.streams.Length - 1; while(remaining > 0) { Stream currentStream = this.streams[streamIndex]; // If this is the last stream, just write. If the data is larger than the last // stream's remaining bytes, it will append to that stream, enlarging it. if(streamIndex == lastStreamIndex) { // Write all remaining data into the last stream currentStream.Position = streamOffset; currentStream.Write(buffer, offset, remaining); remaining = 0; } else { // We're writing into a stream that's followed by another stream // Find out how much data we can put into the current stream without // enlarging it (if seeking is supported, so is the Length property) long currentStreamRemaining = currentStream.Length - streamOffset; int bytesToWrite = (int)Math.Min((long)remaining, currentStreamRemaining); // Write all data that can fit into the current stream currentStream.Position = streamOffset; currentStream.Write(buffer, offset, bytesToWrite); // Adjust the offsets and count for the next stream offset += bytesToWrite; remaining -= bytesToWrite; streamOffset = 0; ++streamIndex; } } } else { // Seeking not supported, append everything to the last stream Stream lastStream = this.streams[this.streams.Length - 1]; if(lastStream.CanSeek) { lastStream.Seek(0, SeekOrigin.End); } lastStream.Write(buffer, offset, remaining); } this.position += count; } /// Streams being combined by the stream chainer public Stream[] ChainedStreams { get { return this.streams; } } /// Moves the file pointer /// New position the file pointer will be moved to private void moveFilePointer(long position) { if(!this.allStreamsCanSeek) { throw makeSeekNotSupportedException("seek"); } // Seemingly, it is okay to move the file pointer beyond the end of // the stream until you try to Read() or Write() this.position = position; } /// /// Finds the stream index and local offset for an absolute position within /// the combined streams. /// /// Absolute position within the combined streams /// /// Index of the stream the overall position falls into /// /// /// Local position within the stream indicated by /// private void findStreamIndexAndOffset( long overallPosition, out int streamIndex, out long streamPosition ) { Debug.Assert( this.allStreamsCanSeek, "Call to findStreamIndexAndOffset() but no seek support" ); // In case the position is beyond the stream's end, this is what we will // return to the caller streamIndex = (this.streams.Length - 1); // Search until we have found the stream the position must lie in for(int index = 0; index < this.streams.Length; ++index) { long streamLength = this.streams[index].Length; if(overallPosition < streamLength) { streamIndex = index; break; } overallPosition -= streamLength; } // The overall position will have been decreased by each skipped stream's length, // so it should now contain the local position for the final stream we checked. streamPosition = overallPosition; } /// Determines the capabilities of the chained streams /// /// /// Theoretically, it would be possible to create a stream chainer that supported /// writing only when the file pointer was on a chained stream with write support, /// that could seek within the beginning of the stream until the first chained /// stream with no seek capability was encountered and so on. /// /// /// However, the interface of the Stream class requires us to make a definitive /// statement as to whether the Stream supports seeking, reading and writing. /// We can't return "maybe" or "mostly" in CanSeek, so the only sane choice that /// doesn't violate the Stream interface is to implement these capabilities as /// all or nothing - either all streams support a feature, or the stream chainer /// will report the feature as unsupported. /// /// private void determineCapabilities() { this.allStreamsCanSeek = true; this.allStreamsCanRead = true; this.allStreamsCanWrite = true; for(int index = 0; index < this.streams.Length; ++index) { this.allStreamsCanSeek &= this.streams[index].CanSeek; this.allStreamsCanRead &= this.streams[index].CanRead; this.allStreamsCanWrite &= this.streams[index].CanWrite; } } /// /// Constructs a NotSupportException for an error caused by one of the chained /// streams having no seek support /// /// Action that was tried to perform /// The newly constructed NotSupportedException private static NotSupportedException makeSeekNotSupportedException(string action) { return new NotSupportedException( string.Format( "Can't {0}: at least one of the chained streams does not support seeking", action ) ); } /// Streams that have been chained together private Stream[] streams; /// Current position of the overall file pointer private long position; /// Stream we're currently reading from if seeking is not supported /// /// If seeking is not supported, the stream chainer will read from each stream /// until the end was reached /// sequentially /// private int activeReadStreamIndex; /// Position in the current read stream if seeking is not supported /// /// If there is a mix of streams supporting seeking and not supporting seeking, we /// need to keep track of the read index for those streams that do. If, for example, /// the last stream is written to and read from in succession, the file pointer /// of that stream would have been moved to the end by the write attempt, skipping /// data that should have been read in the following read attempt. /// private long activeReadStreamPosition; /// Whether all of the chained streams support seeking private bool allStreamsCanSeek; /// Whether all of the chained streams support reading private bool allStreamsCanRead; /// Whether all of the chained streams support writing private bool allStreamsCanWrite; } } // namespace Nuclex.Support.IO