为什么不能直接用 ConcurrentQueue<t></t>
代替环形缓冲区
因为
ConcurrentQueue<t></t>是链表实现,内存不连续,无法保证写入/读取的原子性批次,也不支持预分配固定大小和零拷贝访问。环形缓冲区核心价值在于:确定容量、缓存友好、单生产者/单消费者(SPSC)场景下免锁、支持指针快速读写。如果你需要的是高吞吐低延迟的日志暂存、网络包收发或实时音频流缓冲,
ConcurrentQueue的 GC 压力和间接寻址开销会成为瓶颈。
如何用 Interlocked
实现 SPSC 无锁环形缓冲区
关键不是“完全不用锁”,而是避免
lock语句阻塞线程;SPSC 场景下,仅用
Interlocked.CompareExchange和
Interlocked.Add即可协调读写索引。必须满足:一个线程只写、一个线程只读,且不允许多对一或一对多。 缓冲区底层数组用
T[]预分配,长度为 2 的幂(便于位运算取模) 写索引(
_writeIndex)和读索引(
_readIndex)均为
long类型,避免 32 位溢出导致误判 实际位置用
index & (_capacity - 1)计算,比
% _capacity快且安全 写操作前先用
Interlocked.CompareExchange检查是否有足够空位,失败则返回
false(不阻塞) 读操作同理,检查是否有数据可读,再用
Interlocked.Add批量推进读索引
public sealed class RingBuffer<T>
{
private readonly T[] _buffer;
private readonly int _capacity;
private readonly int _mask;
private long _writeIndex;
private long _readIndex;
<pre class='brush:php;toolbar:false;'>public RingBuffer(int capacity)
{
_capacity = RoundUpToPowerOfTwo(capacity);
_mask = _capacity - 1;
_buffer = new T[_capacity];
}
public bool TryWrite(T item)
{
long writePos = Interlocked.Read(ref _writeIndex);
long readPos = Interlocked.Read(ref _readIndex);
long available = _capacity - (writePos - readPos);
if (available <= 0) return false;
_buffer[writePos & _mask] = item;
Interlocked.Increment(ref _writeIndex);
return true;
}
public bool TryRead(out T item)
{
item = default!;
long writePos = Interlocked.Read(ref _writeIndex);
long readPos = Interlocked.Read(ref _readIndex);
if (writePos == readPos) return false;
item = _buffer[readPos & _mask];
Interlocked.Increment(ref _readIndex);
return true;
}
private static int RoundUpToPowerOfTwo(int v)
{
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
return v + 1;
}}
为什么 volatile
不够,而必须用 Interlocked.Read
在 x86/x64 上,
volatile字段读写会插入内存屏障,但不能保证“读-改-写”操作的原子性。比如两个线程同时执行
_writeIndex++,即使字段是
volatile,仍可能丢失一次自增。而
Interlocked.Read(ref _writeIndex)不仅保证读取最新值,还强制刷新 CPU 缓存行,确保你看到的是其他线程写入后的结果。尤其在 ARM 平台上,缺少
Interlocked会导致读写索引严重错乱。
容易被忽略的边界:批量读写与内存可见性
上面示例是单元素读写,实际中常需
TryWriteBatch或
TryReadBatch。这时不能简单循环调用
TryWrite,否则每轮都重复检查可用空间,效率低且逻辑错乱。正确做法是:一次计算最大可写数量,用
Interlocked.CompareExchange原子预留位置,再逐个赋值,最后用
Interlocked.Add提交写索引偏移。同样,读端也要先确认数据量,再批量复制,最后提交读索引——否则中间被写端覆盖就丢数据了。
另外,如果
T是引用类型,写入时只是存引用,不触发对象复制;但若
T是结构体且较大(如超过 16 字节),要考虑缓存行对齐和复制开销。无锁结构体写入本身没问题,但频繁大结构体搬运会抵消无锁带来的性能优势。
