dotnet-io-pipelines

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

dotnet-io-pipelines

dotnet-io-pipelines

High-performance I/O patterns using
System.IO.Pipelines
. Covers
PipeReader
,
PipeWriter
, backpressure management, protocol parser implementation, and Kestrel integration. Pipelines solve the classic problems of buffer management, incomplete reads, and memory copying that plague traditional stream-based network code.
使用
System.IO.Pipelines
实现高性能I/O模式。涵盖
PipeReader
PipeWriter
、背压管理、协议解析器实现以及与Kestrel的集成。Pipelines解决了传统基于流的网络代码中存在的缓冲区管理、不完整读取和内存复制等经典问题。

Scope

适用范围

  • PipeReader/PipeWriter patterns and backpressure management
  • Protocol parser implementation with ReadOnlySequence
  • Kestrel integration and custom transports
  • Buffer management and SequencePosition bookmarks
  • PipeReader/PipeWriter模式与背压管理
  • 基于ReadOnlySequence的协议解析器实现
  • Kestrel集成与自定义传输
  • 缓冲区管理与SequencePosition书签

Out of scope

不包含内容

  • Async/await fundamentals and ValueTask patterns -- see [skill:dotnet-csharp-async-patterns]
  • Benchmarking methodology and Span<T> micro-optimization -- see [skill:dotnet-performance-patterns]
  • File-based I/O (FileStream, RandomAccess, MemoryMappedFile) -- see [skill:dotnet-file-io]
Cross-references: [skill:dotnet-csharp-async-patterns] for async patterns used in pipeline loops, [skill:dotnet-performance-patterns] for Span/Memory optimization techniques, [skill:dotnet-file-io] for file-based I/O patterns (FileStream, RandomAccess, MemoryMappedFile).

  • Async/await基础与ValueTask模式 -- 参考[skill:dotnet-csharp-async-patterns]
  • 基准测试方法与Span<T>微优化 -- 参考[skill:dotnet-performance-patterns]
  • 基于文件的I/O(FileStream、RandomAccess、MemoryMappedFile) -- 参考[skill:dotnet-file-io]
交叉引用:管道循环中使用的异步模式请参考[skill:dotnet-csharp-async-patterns],Span/Memory优化技术请参考[skill:dotnet-performance-patterns],基于文件的I/O模式(FileStream、RandomAccess、MemoryMappedFile)请参考[skill:dotnet-file-io]。

Why Pipelines Over Streams

为何选择Pipelines而非Streams

Traditional
Stream
-based I/O forces developers to manage buffers manually, handle partial reads, and copy data between buffers.
System.IO.Pipelines
solves these problems:
ProblemStream ApproachPipeline Approach
Buffer managementAllocate
byte[]
, resize manually
Automatic pooled buffer management
Partial readsTrack position, concatenate fragments
ReadResult
with
SequencePosition
bookmarks
BackpressureNone -- writer can outpace readerBuilt-in pause/resume thresholds
Memory copiesCopy between buffers at each layerZero-copy slicing with
ReadOnlySequence<byte>
Lifetime managementManual
byte[]
lifecycle
Pooled memory returned on
AdvanceTo
The
Pipe
class connects a
PipeWriter
(producer) and a
PipeReader
(consumer) with an internal buffer pool, flow control, and completion signaling.

传统基于
Stream
的I/O要求开发者手动管理缓冲区、处理部分读取以及在缓冲区之间复制数据。
System.IO.Pipelines
解决了这些问题:
问题Stream方案Pipeline方案
缓冲区管理手动分配
byte[]
并调整大小
自动池化缓冲区管理
不完整读取跟踪位置,拼接数据片段带有
SequencePosition
书签的
ReadResult
背压处理无 -- 写入端可能远超读取端速度内置暂停/恢复阈值
内存复制每层之间在缓冲区复制数据使用
ReadOnlySequence<byte>
实现零拷贝切片
生命周期管理手动管理
byte[]
生命周期
调用
AdvanceTo
时返回池化内存
Pipe
类通过内部缓冲区池、流控制和完成信号连接
PipeWriter
(生产者)和
PipeReader
(消费者)。

Core Concepts

核心概念

Pipe, PipeReader, PipeWriter

Pipe、PipeReader、PipeWriter

csharp
// Create a pipe with default options (uses ArrayPool internally)
var pipe = new Pipe();

PipeWriter writer = pipe.Writer;  // Producer side
PipeReader reader = pipe.Reader;  // Consumer side
csharp
// 使用默认选项创建管道(内部使用ArrayPool)
var pipe = new Pipe();

PipeWriter writer = pipe.Writer;  // 生产者端
PipeReader reader = pipe.Reader;  // 消费者端

PipeWriter -- Producing Data

PipeWriter -- 生成数据

csharp
async Task FillPipeAsync(Stream source, PipeWriter writer,
    CancellationToken ct)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Request a buffer from the pipe's memory pool
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);

        int bytesRead = await source.ReadAsync(memory, ct);
        if (bytesRead == 0)
            break;  // End of stream

        // Tell the pipe how many bytes were written
        writer.Advance(bytesRead);

        // Flush makes data available to the reader.
        // FlushAsync may pause here if the reader is slow (backpressure).
        FlushResult result = await writer.FlushAsync(ct);
        if (result.IsCompleted)
            break;  // Reader stopped consuming
    }

    // Signal completion -- reader will see IsCompleted = true
    await writer.CompleteAsync();
}
Critical rules:
  • Call
    GetMemory
    or
    GetSpan
    before writing -- never write to a previously obtained buffer after
    Advance
  • Call
    Advance
    with the exact number of bytes written
  • Call
    FlushAsync
    to make data available to the reader and to respect backpressure
csharp
async Task FillPipeAsync(Stream source, PipeWriter writer,
    CancellationToken ct)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // 从管道的内存池中请求缓冲区
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);

        int bytesRead = await source.ReadAsync(memory, ct);
        if (bytesRead == 0)
            break;  // 流结束

        // 告知管道已写入的字节数
        writer.Advance(bytesRead);

        // Flush操作让数据对读取端可见。
        // 如果读取端速度较慢,FlushAsync会在此处暂停(背压机制)。
        FlushResult result = await writer.FlushAsync(ct);
        if (result.IsCompleted)
            break;  // 读取端停止消费
    }

    // 发送完成信号 -- 读取端会看到IsCompleted = true
    await writer.CompleteAsync();
}
关键规则:
  • 写入前调用
    GetMemory
    GetSpan
    -- 调用
    Advance
    后绝不能写入之前获取的缓冲区
  • 调用
    Advance
    时传入准确的已写字节数
  • 调用
    FlushAsync
    让数据对读取端可见并遵循背压机制

PipeReader -- Consuming Data

PipeReader -- 消费数据

csharp
async Task ReadPipeAsync(PipeReader reader, CancellationToken ct)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(ct);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // Try to parse messages from the buffer
        while (TryParseMessage(ref buffer, out var message))
        {
            await ProcessMessageAsync(message, ct);
        }

        // Tell the pipe how much was consumed and how much was examined.
        // consumed: data that has been fully processed (will be freed)
        // examined: data that has been looked at (won't trigger re-read
        //           until new data arrives)
        reader.AdvanceTo(buffer.Start, buffer.End);

        if (result.IsCompleted)
            break;  // Writer finished and all data consumed
    }

    await reader.CompleteAsync();
}
Critical rules:
  • Always call
    AdvanceTo
    after
    ReadAsync
    -- failing to do so leaks memory
  • Pass both
    consumed
    and
    examined
    positions:
    consumed
    frees memory,
    examined
    prevents busy-wait when the buffer has been scanned but does not contain a complete message
  • Never access
    ReadResult.Buffer
    after calling
    AdvanceTo
    -- the memory may be recycled

csharp
async Task ReadPipeAsync(PipeReader reader, CancellationToken ct)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(ct);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // 尝试从缓冲区解析消息
        while (TryParseMessage(ref buffer, out var message))
        {
            await ProcessMessageAsync(message, ct);
        }

        // 告知管道已消费和已检查的数据量。
        // consumed: 已完全处理的数据(将被释放)
        // examined: 已检查的数据(在新数据到达前不会触发重新读取)
        reader.AdvanceTo(buffer.Start, buffer.End);

        if (result.IsCompleted)
            break;  // 写入端完成且所有数据已被消费
    }

    await reader.CompleteAsync();
}
关键规则:
  • 调用
    ReadAsync
    后必须调用
    AdvanceTo
    -- 否则会导致内存泄漏
  • 同时传入
    consumed
    examined
    位置:
    consumed
    释放内存,
    examined
    避免缓冲区已扫描但无完整消息时的忙等待
  • 调用
    AdvanceTo
    后绝不能访问
    ReadResult.Buffer
    -- 内存可能已被回收

Backpressure

背压处理

Backpressure prevents fast producers from overwhelming slow consumers. The pipe pauses the writer when unread data exceeds a threshold.
背压机制防止快速生产者压垮慢速消费者。当未读数据超过阈值时,管道会暂停写入端。

PipeOptions Configuration

PipeOptions配置

csharp
var pipe = new Pipe(new PipeOptions(
    pauseWriterThreshold: 64 * 1024,   // Pause writer at 64 KB buffered
    resumeWriterThreshold: 32 * 1024,  // Resume writer when buffer drops to 32 KB
    minimumSegmentSize: 4096,
    useSynchronizationContext: false));
OptionDefaultPurpose
PauseWriterThreshold
65,536
FlushAsync
pauses when unread bytes exceed this
ResumeWriterThreshold
32,768
FlushAsync
resumes when unread bytes drop below this
MinimumSegmentSize
4,096Minimum buffer segment allocation size
UseSynchronizationContext
false
Set
false
for server code to avoid context captures
csharp
var pipe = new Pipe(new PipeOptions(
    pauseWriterThreshold: 64 * 1024,   // 缓冲数据达到64 KB时暂停写入端
    resumeWriterThreshold: 32 * 1024,  // 缓冲数据降至32 KB时恢复写入端
    minimumSegmentSize: 4096,
    useSynchronizationContext: false));
选项默认值用途
PauseWriterThreshold
65,536未读字节数超过此值时,
FlushAsync
暂停
ResumeWriterThreshold
32,768未读字节数低于此值时,
FlushAsync
恢复
MinimumSegmentSize
4,096缓冲区段的最小分配大小
UseSynchronizationContext
false
服务器代码设为
false
以避免上下文捕获

How Backpressure Works

背压机制工作原理

  1. Writer calls
    FlushAsync
    after
    Advance
  2. If buffered (unread) data exceeds
    PauseWriterThreshold
    ,
    FlushAsync
    does not complete until the reader consumes enough data to drop below
    ResumeWriterThreshold
  3. The writer is effectively paused -- no busy-waiting, no exceptions, just an awaitable that completes when the reader catches up
This prevents unbounded memory growth when a producer (network socket, file) is faster than the consumer (parser, business logic).

  1. 写入端在
    Advance
    后调用
    FlushAsync
  2. 如果缓冲(未读)数据超过
    PauseWriterThreshold
    FlushAsync
    会暂停,直到读取端消费足够数据使缓冲降至
    ResumeWriterThreshold
    以下
  3. 写入端会被有效暂停 -- 无忙等待,无异常,仅当读取端跟上进度时,可等待对象才会完成
这避免了生产者(网络套接字、文件)速度快于消费者(解析器、业务逻辑)时的无限制内存增长。

Protocol Parsing

协议解析

Pipelines excel at parsing binary protocols because
ReadOnlySequence<byte>
handles fragmented data across multiple buffer segments without copying.
Pipelines在解析二进制协议方面表现出色,因为
ReadOnlySequence<byte>
无需复制即可处理跨多个缓冲区段的碎片化数据。

Length-Prefixed Protocol Parser

长度前缀协议解析器

A common pattern: each message starts with a 4-byte big-endian length header followed by the payload.
csharp
static bool TryParseMessage(
    ref ReadOnlySequence<byte> buffer,
    out ReadOnlySequence<byte> payload)
{
    payload = default;

    // Need at least 4 bytes for the length prefix
    if (buffer.Length < 4)
        return false;

    // Read length from first 4 bytes
    int length;
    if (buffer.FirstSpan.Length >= 4)
    {
        length = BinaryPrimitives.ReadInt32BigEndian(buffer.FirstSpan);
    }
    else
    {
        // Slow path: length header spans multiple segments
        Span<byte> lengthBytes = stackalloc byte[4];
        buffer.Slice(0, 4).CopyTo(lengthBytes);
        length = BinaryPrimitives.ReadInt32BigEndian(lengthBytes);
    }

    // Validate length to prevent allocation attacks
    if (length < 0 || length > 1_048_576)  // 1 MB max
        throw new ProtocolViolationException(
            $"Invalid message length: {length}");

    // Check if the full message is available
    long totalLength = 4 + length;
    if (buffer.Length < totalLength)
        return false;

    // Extract the payload (zero-copy slice)
    payload = buffer.Slice(4, length);

    // Advance the buffer past this message
    buffer = buffer.Slice(totalLength);
    return true;
}
常见模式:每条消息以4字节大端序长度头开头,后跟负载数据。
csharp
static bool TryParseMessage(
    ref ReadOnlySequence<byte> buffer,
    out ReadOnlySequence<byte> payload)
{
    payload = default;

    // 至少需要4字节获取长度前缀
    if (buffer.Length < 4)
        return false;

    // 从开头4字节读取长度
    int length;
    if (buffer.FirstSpan.Length >= 4)
    {
        length = BinaryPrimitives.ReadInt32BigEndian(buffer.FirstSpan);
    }
    else
    {
        // 慢路径:长度头跨多个段
        Span<byte> lengthBytes = stackalloc byte[4];
        buffer.Slice(0, 4).CopyTo(lengthBytes);
        length = BinaryPrimitives.ReadInt32BigEndian(lengthBytes);
    }

    // 验证长度以防止分配攻击
    if (length < 0 || length > 1_048_576)  // 最大1 MB
        throw new ProtocolViolationException(
            $"无效消息长度: {length}");

    // 检查完整消息是否可用
    long totalLength = 4 + length;
    if (buffer.Length < totalLength)
        return false;

    // 提取负载数据(零拷贝切片)
    payload = buffer.Slice(4, length);

    // 将缓冲区推进到当前消息之后
    buffer = buffer.Slice(totalLength);
    return true;
}

Delimiter-Based Protocol Parser (Line Protocol)

基于分隔符的协议解析器(行协议)

csharp
static bool TryReadLine(
    ref ReadOnlySequence<byte> buffer,
    out ReadOnlySequence<byte> line)
{
    // Look for the newline delimiter
    SequencePosition? position = buffer.PositionOf((byte)'\n');
    if (position is null)
    {
        line = default;
        return false;
    }

    // Slice up to (not including) the delimiter
    line = buffer.Slice(0, position.Value);

    // Advance past the delimiter
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}
csharp
static bool TryReadLine(
    ref ReadOnlySequence<byte> buffer,
    out ReadOnlySequence<byte> line)
{
    // 查找换行分隔符
    SequencePosition? position = buffer.PositionOf((byte)'\n');
    if (position is null)
    {
        line = default;
        return false;
    }

    // 切片到分隔符(不包含分隔符)
    line = buffer.Slice(0, position.Value);

    // 推进到分隔符之后
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Working with ReadOnlySequence<byte>

使用ReadOnlySequence<byte>

ReadOnlySequence<byte>
may span multiple non-contiguous memory segments. Handle both paths:
csharp
static string DecodeUtf8(ReadOnlySequence<byte> sequence)
{
    // Fast path: single contiguous segment
    if (sequence.IsSingleSegment)
    {
        return Encoding.UTF8.GetString(sequence.FirstSpan);
    }

    // Slow path: multi-segment -- rent a contiguous buffer
    int length = (int)sequence.Length;
    byte[] rented = ArrayPool<byte>.Shared.Rent(length);
    try
    {
        sequence.CopyTo(rented);
        return Encoding.UTF8.GetString(rented, 0, length);
    }
    finally
    {
        ArrayPool<byte>.Shared.Return(rented);
    }
}

ReadOnlySequence<byte>
可能跨多个非连续内存段。需处理两种情况:
csharp
static string DecodeUtf8(ReadOnlySequence<byte> sequence)
{
    // 快路径:单个连续段
    if (sequence.IsSingleSegment)
    {
        return Encoding.UTF8.GetString(sequence.FirstSpan);
    }

    // 慢路径:多段 -- 租用连续缓冲区
    int length = (int)sequence.Length;
    byte[] rented = ArrayPool<byte>.Shared.Rent(length);
    try
    {
        sequence.CopyTo(rented);
        return Encoding.UTF8.GetString(rented, 0, length);
    }
    finally
    {
        ArrayPool<byte>.Shared.Return(rented);
    }
}

Stream Adapter

流适配器

Bridge
System.IO.Pipelines
with existing
Stream
-based APIs using
PipeReader.Create
and
PipeWriter.Create
.
csharp
// Wrap a NetworkStream for pipeline-based reading
await using var networkStream = tcpClient.GetStream();
var reader = PipeReader.Create(networkStream, new StreamPipeReaderOptions(
    bufferSize: 4096,
    minimumReadSize: 1024,
    leaveOpen: true)); // Caller manages networkStream lifetime

try
{
    await ProcessProtocolAsync(reader, cancellationToken);
}
finally
{
    await reader.CompleteAsync();
}
csharp
// Wrap a stream for pipeline-based writing
var writer = PipeWriter.Create(networkStream, new StreamPipeWriterOptions(
    minimumBufferSize: 4096,
    leaveOpen: true)); // Caller manages networkStream lifetime

try
{
    await WriteResponseAsync(writer, response, cancellationToken);
}
finally
{
    await writer.CompleteAsync();
}

使用
PipeReader.Create
PipeWriter.Create
System.IO.Pipelines
与现有基于
Stream
的API桥接。
csharp
// 包装NetworkStream以支持基于管道的读取
await using var networkStream = tcpClient.GetStream();
var reader = PipeReader.Create(networkStream, new StreamPipeReaderOptions(
    bufferSize: 4096,
    minimumReadSize: 1024,
    leaveOpen: true)); // 调用方管理networkStream生命周期

try
{
    await ProcessProtocolAsync(reader, cancellationToken);
}
finally
{
    await reader.CompleteAsync();
}
csharp
// 包装流以支持基于管道的写入
var writer = PipeWriter.Create(networkStream, new StreamPipeWriterOptions(
    minimumBufferSize: 4096,
    leaveOpen: true)); // 调用方管理networkStream生命周期

try
{
    await WriteResponseAsync(writer, response, cancellationToken);
}
finally
{
    await writer.CompleteAsync();
}

Kestrel Integration

Kestrel集成

ASP.NET Core's Kestrel web server uses
System.IO.Pipelines
internally for HTTP request/response processing. Custom connection middleware can access the transport-level pipe directly.
ASP.NET Core的Kestrel Web服务器内部使用
System.IO.Pipelines
处理HTTP请求/响应。自定义连接中间件可直接访问传输层管道。

Connection Middleware

连接中间件

csharp
// Custom connection middleware for protocol-level processing
builder.WebHost.ConfigureKestrel(options =>
{
    options.ListenLocalhost(9000, listenOptions =>
    {
        listenOptions.UseConnectionHandler<MyProtocolHandler>();
    });
});

public sealed class MyProtocolHandler : ConnectionHandler
{
    public override async Task OnConnectedAsync(
        ConnectionContext connection)
    {
        var reader = connection.Transport.Input;
        var writer = connection.Transport.Output;
        var ct = connection.ConnectionClosed;

        try
        {
            while (true)
            {
                ReadResult result = await reader.ReadAsync(ct);
                ReadOnlySequence<byte> buffer = result.Buffer;

                while (TryParseMessage(ref buffer, out var payload))
                {
                    var response = ProcessRequest(payload);
                    await WriteResponseAsync(writer, response);
                }

                reader.AdvanceTo(buffer.Start, buffer.End);

                if (result.IsCompleted)
                    break;
            }
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    private static async Task WriteResponseAsync(
        PipeWriter writer, ReadOnlyMemory<byte> response)
    {
        // Write length prefix + payload
        var memory = writer.GetMemory(4 + response.Length);
        BinaryPrimitives.WriteInt32BigEndian(
            memory.Span, response.Length);
        response.CopyTo(memory[4..]);
        writer.Advance(4 + response.Length);
        await writer.FlushAsync();
    }
}
csharp
// 用于协议层处理的自定义连接中间件
builder.WebHost.ConfigureKestrel(options =>
{
    options.ListenLocalhost(9000, listenOptions =>
    {
        listenOptions.UseConnectionHandler<MyProtocolHandler>();
    });
});

public sealed class MyProtocolHandler : ConnectionHandler
{
    public override async Task OnConnectedAsync(
        ConnectionContext connection)
    {
        var reader = connection.Transport.Input;
        var writer = connection.Transport.Output;
        var ct = connection.ConnectionClosed;

        try
        {
            while (true)
            {
                ReadResult result = await reader.ReadAsync(ct);
                ReadOnlySequence<byte> buffer = result.Buffer;

                while (TryParseMessage(ref buffer, out var payload))
                {
                    var response = ProcessRequest(payload);
                    await WriteResponseAsync(writer, response);
                }

                reader.AdvanceTo(buffer.Start, buffer.End);

                if (result.IsCompleted)
                    break;
            }
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    private static async Task WriteResponseAsync(
        PipeWriter writer, ReadOnlyMemory<byte> response)
    {
        // 写入长度前缀 + 负载
        var memory = writer.GetMemory(4 + response.Length);
        BinaryPrimitives.WriteInt32BigEndian(
            memory.Span, response.Length);
        response.CopyTo(memory[4..]);
        writer.Advance(4 + response.Length);
        await writer.FlushAsync();
    }
}

IDuplexPipe

IDuplexPipe

Kestrel exposes connections as
IDuplexPipe
, combining
PipeReader
and
PipeWriter
into a single transport abstraction. This pattern also works for custom TCP servers, WebSocket handlers, and named-pipe protocols.
csharp
public interface IDuplexPipe
{
    PipeReader Input { get; }
    PipeWriter Output { get; }
}

Kestrel将连接暴露为
IDuplexPipe
,将
PipeReader
PipeWriter
组合为单个传输抽象。此模式也适用于自定义TCP服务器、WebSocket处理程序和命名管道协议。
csharp
public interface IDuplexPipe
{
    PipeReader Input { get; }
    PipeWriter Output { get; }
}

Performance Tips

性能技巧

  1. Minimize copies -- use
    ReadOnlySequence<byte>
    slicing instead of copying to
    byte[]
    . Parse directly from the sequence when possible.
  2. Use
    GetSpan
    /
    GetMemory
    correctly
    -- request the minimum size you need. The pipe may return a larger buffer, which is fine. Do not cache the returned
    Span
    /
    Memory
    across
    Advance
    /
    FlushAsync
    calls.
  3. Set
    useSynchronizationContext: false
    -- server code should never capture the synchronization context. This is the default for
    PipeOptions
    but explicit is clearer.
  4. Tune pause/resume thresholds -- the defaults (64 KB / 32 KB) work for most scenarios. Increase for high-throughput bulk transfer; decrease for low-latency interactive protocols.
  5. Prefer
    SequenceReader<byte>
    -- for complex parsing,
    SequenceReader<byte>
    provides
    TryRead
    ,
    TryReadBigEndian
    ,
    AdvancePast
    , and
    IsNext
    methods that handle multi-segment sequences transparently.
csharp
static bool TryParseHeader(
    ref ReadOnlySequence<byte> buffer,
    out int messageType,
    out int length)
{
    var reader = new SequenceReader<byte>(buffer);

    if (!reader.TryRead(out byte typeByte) ||
        !reader.TryReadBigEndian(out int len))
    {
        messageType = 0;
        length = 0;
        return false;
    }

    messageType = typeByte;
    length = len;
    buffer = buffer.Slice(reader.Position);
    return true;
}

  1. 最小化复制 -- 使用
    ReadOnlySequence<byte>
    切片而非复制到
    byte[]
    。尽可能直接从序列解析。
  2. 正确使用
    GetSpan
    /
    GetMemory
    -- 请求所需的最小大小。管道可能返回更大的缓冲区,这没问题。不要在
    Advance
    /
    FlushAsync
    调用之间缓存返回的
    Span
    /
    Memory
  3. 设置
    useSynchronizationContext: false
    -- 服务器代码绝不能捕获同步上下文。这是
    PipeOptions
    的默认值,但显式设置更清晰。
  4. 调整暂停/恢复阈值 -- 默认值(64 KB / 32 KB)适用于大多数场景。高吞吐量批量传输可增大阈值;低延迟交互协议可减小阈值。
  5. 优先使用
    SequenceReader<byte>
    -- 对于复杂解析,
    SequenceReader<byte>
    提供
    TryRead
    TryReadBigEndian
    AdvancePast
    IsNext
    方法,可透明处理多段序列。
csharp
static bool TryParseHeader(
    ref ReadOnlySequence<byte> buffer,
    out int messageType,
    out int length)
{
    var reader = new SequenceReader<byte>(buffer);

    if (!reader.TryRead(out byte typeByte) ||
        !reader.TryReadBigEndian(out int len))
    {
        messageType = 0;
        length = 0;
        return false;
    }

    messageType = typeByte;
    length = len;
    buffer = buffer.Slice(reader.Position);
    return true;
}

Agent Gotchas

常见陷阱

  1. Do not forget to call
    AdvanceTo
    after
    ReadAsync
    -- skipping
    AdvanceTo
    leaks pooled memory and eventually causes
    OutOfMemoryException
    . Every
    ReadAsync
    must be paired with an
    AdvanceTo
    .
  2. Do not access
    ReadResult.Buffer
    after calling
    AdvanceTo
    -- the underlying memory segments may be returned to the pool. Copy or parse all needed data before advancing.
  3. Do not set
    consumed
    equal to
    examined
    when no complete message was found
    -- this creates a busy-wait loop. Set
    consumed
    to
    buffer.Start
    (nothing consumed) and
    examined
    to
    buffer.End
    (everything examined) so the pipe waits for new data.
  4. Do not ignore
    FlushResult.IsCompleted
    -- it means the reader has stopped consuming. Continue writing after this and data will be silently discarded.
  5. Do not use
    Pipe
    for simple stream-to-stream copying
    --
    Stream.CopyToAsync
    is simpler and equally efficient. Use pipelines when you need parsing, backpressure, or zero-copy slicing.
  6. Do not use
    BinaryPrimitives
    methods on spans shorter than required
    -- always check
    buffer.Length
    before reading fixed-width values to avoid
    ArgumentOutOfRangeException
    .

  1. 不要忘记在
    ReadAsync
    后调用
    AdvanceTo
    -- 跳过
    AdvanceTo
    会导致池化内存泄漏,最终引发
    OutOfMemoryException
    。每个
    ReadAsync
    必须对应一个
    AdvanceTo
  2. 调用
    AdvanceTo
    后不要访问
    ReadResult.Buffer
    -- 底层内存段可能已返回池。在推进前复制或解析所有需要的数据。
  3. 未找到完整消息时,不要将
    consumed
    设为等于
    examined
    -- 这会创建忙等待循环。将
    consumed
    设为
    buffer.Start
    (无数据消费),
    examined
    设为
    buffer.End
    (所有数据已检查),这样管道会等待新数据。
  4. 不要忽略
    FlushResult.IsCompleted
    -- 这表示读取端已停止消费。在此之后继续写入,数据会被静默丢弃。
  5. 不要使用
    Pipe
    进行简单的流到流复制
    --
    Stream.CopyToAsync
    更简单且效率相当。当需要解析、背压或零拷贝切片时再使用Pipelines。
  6. 不要在长度不足的Span上使用
    BinaryPrimitives
    方法
    -- 读取固定宽度值前务必检查
    buffer.Length
    ,避免
    ArgumentOutOfRangeException

Knowledge Sources

知识来源

References

参考资料