I had a requirement the other day to transfer data sequentially through a ‘pipe’ of components, and decided to implement a simple asynchronous “ring buffer” that could act as a medium for the transfer. The implementation itself relies on the Disposable base class posted in my previous article, and following on it (which I’ll put up in subsequent posts) I’ve added some classes to wrap the read/write transfer logic into something more usable. Nevertheless, I’ve come to value this little gem as the core component of a very useful data-transfer buffer.

using System;
using System.Threading;

namespace PatternExplorer {
using System;
using System.Threading;

namespace PatternExplorer {
   /// <summary>
   /// Provides a thread-safe fixed-size read-write data buffer. 
   /// </summary>
   /// <remarks>
   ///   Reading from an empty buffer will wait until data is written. 
   ///   Writing to a full buffer will wait until data is read.
   /// </remarks>
   public sealed class RingBuffer : Disposable {
      /// <summary>
      /// The default buffer size.
      /// </summary>
      private const int DEFAULT_SIZE = 1024;

      /// <summary>
      /// Storage for the ring buffer contents.
      /// </summary>
      private readonly byte[] _Data;

So then first off, as I mentioned, I’m inheriting from my Disposable base class, which implements IDisposable and wraps it neatly with a space for me to DisposeManagedFields and a convenient way to ThrowIfDisposed from my member functions.

From the class (XML-Doc) comment remarks, you’ll note that this implementation will follow the standard rules of the producer-consumer problem, in that readers will block whenever the buffer is empty, and writers will block whenever it’s full.

Getting to the code then, I’ve defined a DEFAULT_SIZE for the buffer (being 1024 bytes, or 1kb) and created the internal byte array that will hold our in-flight _Data.

      /// <summary>
      /// An instance lock object.
      /// </summary>
      private readonly object _ModificationKey;

      /// <summary>
      /// The readers gate.
      /// </summary>
      private readonly ManualResetEvent _HasData;

      /// <summary>
      /// The writers gate.
      /// </summary>
      private readonly ManualResetEvent _HasCapacity;

I’m using somewhat off-the-beaten-track names on my synchronization objects, primarily in an attempt to make it clear to the reader what the various fields are intended for. As such, I have an object to synchronize access to modification of the data, and two ManualResetEvent fields to signal when the buffer _HasData and when it _HasCapacity.

      /// <summary>
      /// Index for the head of the buffer.
      /// </summary>
      /// <remarks>
      ///   The index of the next byte to be <see cref="Write">written</see>.
      /// </remarks>
      private int _HeadIndex;

      /// <summary>
      /// Index for the tail of the buffer.
      /// </summary>
      /// <remarks>
      ///   The index of the next byte to be <see cref="Read">read</see>.
      /// </remarks>
      private int _TailIndex;

      /// <summary>
      /// The number of unread bytes in the buffer.
      /// </summary>
      private int _Count;
      

The above portion is fairly self-explanatory, being some fields to keep track of where the next byte should be written, where the next byte should be read, and a count of the unread bytes in the buffer.

      /// <summary>
      /// Create a new RingBuffer with a specified size.
      /// </summary>
      public RingBuffer() : this(DEFAULT_SIZE) {}

      /// <summary>
      /// Create a new RingBuffer with a specified size.
      /// </summary>
      /// <param name="size">The size of the ring buffer to create.</param>
      public RingBuffer( int size ) {
         if( size <= 0 ) {
            throw new ArgumentOutOfRangeException("size");
         }

         _Data = new byte[size];
         _ModificationKey = new object();

         // readers block until something is written
         _HasData = new ManualResetEvent(false);

         // writers may enter immediately
         _HasCapacity = new ManualResetEvent(true);
      }

      /// <summary>
      /// Finalizer.
      /// </summary>
      ~RingBuffer() {
         Dispose(false);
      }

After ensuring that we have a valid starting size for the buffer, we initialize the buffer and synchronization fields. It’s useful to note that the _HasData field is initialized as not signalled (i.e. there is no data yet) and conversely that the _HasCapacity event starts of in a signalled state (i.e. there is capacity).

      /// <summary>
      /// Gets the number of unread bytes in the buffer.
      /// </summary>
      public int Count {
         get { return _Count; }
      }

      /// <summary>
      /// Gets the number of unwritten bytes in the buffer.
      /// </summary>
      public int FreeCount {
         get { return _Data.Length - Count; }
      }

      /// <summary>
      /// Gets the total number of bytes in the buffer.
      /// </summary>
      public int Capacity {
         get { return _Data.Length; }
      }

      /// <summary>
      /// Gets a value indicating whether the buffer is empty or not.
      /// </summary>
      public bool IsEmpty {
         get { return Count == 0; }
      }

      /// <summary>
      /// Gets a value indicating whether the buffer is full or not.
      /// </summary>
      public bool IsFull {
         get { return (Count == _Data.Length); }
      }

      /// <summary>
      /// Gets a flag indicating if the buffer is closed.
      /// </summary>
      public bool IsClosed { get; private set; }

Some fairly trivial boiler-plate property implementations leads us to…

      /// <summary>
      /// Gets the element at the specified <paramref name="offset"/> from the tail index.
      /// </summary>
      /// <param name="offset">The zero-based index of the element to retrieve.</param>
      /// <returns>The requested element.</returns>
      /// <remarks>this[0] will return the same as ReadByte().</remarks>
      public byte this[int offset] {
         get {
            ThrowIfDisposed();
            if( (offset < 0) || (offset >= _Count) ) {
               throw new ArgumentOutOfRangeException("offset");
            }
            return _Data[(_TailIndex + offset) % _Data.Length];
         }
      }

… a method that is exposed in a somewhat tongue-in-cheek manner, as it probably isn’t something you would want in a production version of your software. Essentially, it simply “peeks” at a specified offset from the tail index, and it does so in a blatantly thread-unsafe way.

      /// <summary>
      /// Writes a byte to the head of the RingBuffer.
      /// </summary>
      /// <param name="value">The value to add.</param>
      public void WriteByte( byte value ) {
         ThrowIfDisposed();

         if( IsClosed ) {
            throw new ApplicationException("Buffer is closed.");
         }

         _HasCapacity.WaitOne();

         bool signalReaders;

         lock( _ModificationKey ) {
            _Data[_HeadIndex] = value;
            _HeadIndex = (_HeadIndex + 1) % _Data.Length;

            signalReaders = IsEmpty;
            _Count++;
         }

         if( IsFull ) {
            _HasCapacity.Reset();
         }

         if( signalReaders ) {
            _HasData.Set();
         }
      }

At last we come to some meaty bits, where (in this particular case) we write a single byte of data to the buffer.

First we ensure that we haven’t been disposed, and that the buffer hasn’t been closed for writing. Next we ensure that we have capacity for writing to the buffer (if not, we’ll wait until we’re signalled when capacity is available).

The data writing sequence (wrapped in a lock around the modification key) is then: a) write the byte; b) increment the head index; c) check if we need to signal readers (i.e. that before we increment the count, the count was zero, and therefore the _HasData event would be unsignalled); and finally d) increment the count.

After that, we reset the writers’ gate if the buffer is full, and signal the readers if that was determined to be necessary.

      /// <summary>
      /// Reads a byte from the buffer.
      /// </summary>
      /// <returns>The next available byte, or zero (0) if it's empty.</returns>
      public byte ReadByte() {
         ThrowIfDisposed();

         byte value = 0;

         _HasData.WaitOne();

         if( !IsEmpty ) {
            bool signalWriters;

            lock( _ModificationKey ) {
               value = _Data[_TailIndex];
               _TailIndex = (_TailIndex + 1) % _Data.Length;

               signalWriters = IsFull;
               _Count--;
            }

            if( !IsClosed &amp;&amp; (Count == 0) ) {
               _HasData.Reset();
            }

            if( signalWriters ) {
               _HasCapacity.Set();
            }
         }

         return value;
      }

In a very similar fashion to WriteByte, this function will read a single byte from the buffer. The preparatory sequence then is to check that we’re not disposed, and to ensure that we have data (or wait).

The check for IsEmpty might seem confusing at first, but I’ve basically supported the “Flush” scenario (see the Flush function further down) where we release any waiting readers without actually having any data for them to read.

So then, assuming that we do in fact have data, the reading sequence (wrapped again in a lock around the modification key) becomes: a) read the value; b) increment the tail index; c) check if previously the buffer was full (meaning there would be writers waiting); and d) decrement the count.

Then, if we have no more bytes (AND we’re not closed – remember the “Flush” scenario), reset the readers’ gate. Finally, if it was determined to be necessary, signal any waiting writers.

      /// <summary>
      /// Writes the specified data to the head of the RingBuffer.
      /// </summary>
      /// <param name="buffer">The input buffer.</param>
      /// <param name="index">The index of the input data.</param>
      /// <param name="count">The number of bytes to copy.</param>
      public void Write( byte[] buffer, int index, int count ) {
         ThrowIfDisposed();

         if( IsClosed ) {
            throw new ApplicationException("Buffer is closed");
         }

         while( count != 0 ) {
            _HasCapacity.WaitOne();

            bool signalReaders;

            // Gauranteed to not be full at this point, 
            //      however readers may still read from the buffer first.
            lock( _ModificationKey ) {
               int bytesToWrite = Math.Min(Capacity - Count, count);

               signalReaders = (Count == 0);

               while( 0 != bytesToWrite-- ) {
                  _Data[_HeadIndex] = buffer[index];
                  index++;

                  _HeadIndex = (_HeadIndex + 1) % _Data.Length;

                  count--;
                  _Count++;
               }
            }

            if( IsFull ) {
               _HasCapacity.Reset();
            }

            if( signalReaders ) {
               _HasData.Set();
            }
         }
      }

Apart from the introduction of a couple of while loops to manage the writing of a sequence of bytes (as opposed to a single byte), the pattern above is identical to the earlier WriteByte function. It’s worth noting perhaps that the outer loop intends to ensure that we write all the requested bytes, in the case of the inner loop not having enough capacity available at first.

      /// <summary>
      /// Reads data from the tail of the RingBuffer.
      /// </summary>
      /// <param name="buffer">The output buffer.</param>
      /// <param name="index">The index of the output data.</param>
      /// <param name="count">The number of bytes to copy.</param>
      /// <returns>The number of bytes read.</returns>
      public int Read( byte[] buffer, int index, int count ) {
         ThrowIfDisposed();

         int result = 0;

         while( count != 0 ) {
            _HasData.WaitOne();

            if( IsEmpty ) {
               count = 0;

            } else {
               lock( _ModificationKey ) {
                  int toRead = Math.Min(Count, count);

                  result += toRead;

                  bool signalWriters = IsFull;

                  while( 0 != toRead-- ) {
                     buffer[index] = _Data[_TailIndex];
                     index++;

                     _TailIndex = (_TailIndex + 1) % _Data.Length;

                     count--;
                     _Count--;
                  }

                  if( !IsClosed &amp;&amp; (Count == 0) ) {
                     _HasData.Reset();
                  }

                  if( signalWriters ) {
                     _HasCapacity.Set();
                  }
               }
            }
         }

         return result;
      }

Again, a very similar pattern to ReadByte, with additions along the lines of those introduced in WriteBytes intended to manage the reading of a sequence of bytes. As before, our outer loop intends to ensure we read all the requested bytes, in the case that the inner loop may not have had all the bytes available at first.

      /// <summary>
      /// Close the buffer for writing, and flush any waiting readers.
      /// </summary>
      /// <remarks>
      ///   A Read when the buffer is closed and there is no data will return -1.
      /// </remarks>
      public void Flush() {
         ThrowIfDisposed();
         IsClosed = true;
         _HasData.Set();
      }

      /// <summary>
      /// Clear the buffer contents, and reset the <see cref="IsClosed"/> status.
      /// </summary>
      public void Reset() {
         ThrowIfDisposed();

         _TailIndex = 0;
         _HeadIndex = 0;
         _Count = 0;

         Array.Clear(_Data, 0, _Data.Length);

         _HasData.Reset();
         _HasCapacity.Set();

         IsClosed = false;
      }

Like I mentioned earlier, the Flush function is intended to allow the “release” of any threads potentially waiting on the buffer for signals, when it is clear that those signals would not be forthcoming. This is, of course, an implementation detail that would be the responsibility of the code that is consuming the buffer.

Conversely, the Reset function aims to allow the restoration of the buffer into a usable state after it had been “Flushed”. Note though that this will (quite correctly) not be allowed if the object had been disposed.

      /// <summary>
      /// This method disposes all <see cref="IDisposable"/> fields.
      /// </summary>
      protected override void DisposeManagedFields() {
         base.DisposeManagedFields();

         _HasCapacity.Close();
         ((IDisposable)_HasCapacity).Dispose();

         _HasData.Close();
         ((IDisposable)_HasData).Dispose();
      }
   }
}

Finally, we’ll clean up our disposable fields on the override from the Disposable base class.

Download the code here.