dotnet-io-pipelines
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinesedotnet-io-pipelines
dotnet-io-pipelines
High-performance I/O patterns using . Covers , , backpressure management, protocol parser implementation, and Kestrel integration. Pipelines solve the classic problems of buffer management, incomplete reads, and memory copying that plague traditional stream-based network code.
System.IO.PipelinesPipeReaderPipeWriter使用实现高性能I/O模式。涵盖、、背压管理、协议解析器实现以及与Kestrel的集成。Pipelines解决了传统基于流的网络代码中存在的缓冲区管理、不完整读取和内存复制等经典问题。
System.IO.PipelinesPipeReaderPipeWriterScope
适用范围
- PipeReader/PipeWriter patterns and backpressure management
- Protocol parser implementation with ReadOnlySequence
- Kestrel integration and custom transports
- Buffer management and SequencePosition bookmarks
- PipeReader/PipeWriter模式与背压管理
- 基于ReadOnlySequence的协议解析器实现
- Kestrel集成与自定义传输
- 缓冲区管理与SequencePosition书签
Out of scope
不包含内容
- Async/await fundamentals and ValueTask patterns -- see [skill:dotnet-csharp-async-patterns]
- Benchmarking methodology and Span<T> micro-optimization -- see [skill:dotnet-performance-patterns]
- File-based I/O (FileStream, RandomAccess, MemoryMappedFile) -- see [skill:dotnet-file-io]
Cross-references: [skill:dotnet-csharp-async-patterns] for async patterns used in pipeline loops, [skill:dotnet-performance-patterns] for Span/Memory optimization techniques, [skill:dotnet-file-io] for file-based I/O patterns (FileStream, RandomAccess, MemoryMappedFile).
- Async/await基础与ValueTask模式 -- 参考[skill:dotnet-csharp-async-patterns]
- 基准测试方法与Span<T>微优化 -- 参考[skill:dotnet-performance-patterns]
- 基于文件的I/O(FileStream、RandomAccess、MemoryMappedFile) -- 参考[skill:dotnet-file-io]
交叉引用:管道循环中使用的异步模式请参考[skill:dotnet-csharp-async-patterns],Span/Memory优化技术请参考[skill:dotnet-performance-patterns],基于文件的I/O模式(FileStream、RandomAccess、MemoryMappedFile)请参考[skill:dotnet-file-io]。
Why Pipelines Over Streams
为何选择Pipelines而非Streams
Traditional -based I/O forces developers to manage buffers manually, handle partial reads, and copy data between buffers. solves these problems:
StreamSystem.IO.Pipelines| Problem | Stream Approach | Pipeline Approach |
|---|---|---|
| Buffer management | Allocate | Automatic pooled buffer management |
| Partial reads | Track position, concatenate fragments | |
| Backpressure | None -- writer can outpace reader | Built-in pause/resume thresholds |
| Memory copies | Copy between buffers at each layer | Zero-copy slicing with |
| Lifetime management | Manual | Pooled memory returned on |
The class connects a (producer) and a (consumer) with an internal buffer pool, flow control, and completion signaling.
PipePipeWriterPipeReader传统基于的I/O要求开发者手动管理缓冲区、处理部分读取以及在缓冲区之间复制数据。解决了这些问题:
StreamSystem.IO.Pipelines| 问题 | Stream方案 | Pipeline方案 |
|---|---|---|
| 缓冲区管理 | 手动分配 | 自动池化缓冲区管理 |
| 不完整读取 | 跟踪位置,拼接数据片段 | 带有 |
| 背压处理 | 无 -- 写入端可能远超读取端速度 | 内置暂停/恢复阈值 |
| 内存复制 | 每层之间在缓冲区复制数据 | 使用 |
| 生命周期管理 | 手动管理 | 调用 |
PipePipeWriterPipeReaderCore Concepts
核心概念
Pipe, PipeReader, PipeWriter
Pipe、PipeReader、PipeWriter
csharp
// Create a pipe with default options (uses ArrayPool internally)
var pipe = new Pipe();
PipeWriter writer = pipe.Writer; // Producer side
PipeReader reader = pipe.Reader; // Consumer sidecsharp
// 使用默认选项创建管道(内部使用ArrayPool)
var pipe = new Pipe();
PipeWriter writer = pipe.Writer; // 生产者端
PipeReader reader = pipe.Reader; // 消费者端PipeWriter -- Producing Data
PipeWriter -- 生成数据
csharp
async Task FillPipeAsync(Stream source, PipeWriter writer,
CancellationToken ct)
{
const int minimumBufferSize = 512;
while (true)
{
// Request a buffer from the pipe's memory pool
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await source.ReadAsync(memory, ct);
if (bytesRead == 0)
break; // End of stream
// Tell the pipe how many bytes were written
writer.Advance(bytesRead);
// Flush makes data available to the reader.
// FlushAsync may pause here if the reader is slow (backpressure).
FlushResult result = await writer.FlushAsync(ct);
if (result.IsCompleted)
break; // Reader stopped consuming
}
// Signal completion -- reader will see IsCompleted = true
await writer.CompleteAsync();
}Critical rules:
- Call or
GetMemorybefore writing -- never write to a previously obtained buffer afterGetSpanAdvance - Call with the exact number of bytes written
Advance - Call to make data available to the reader and to respect backpressure
FlushAsync
csharp
async Task FillPipeAsync(Stream source, PipeWriter writer,
CancellationToken ct)
{
const int minimumBufferSize = 512;
while (true)
{
// 从管道的内存池中请求缓冲区
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await source.ReadAsync(memory, ct);
if (bytesRead == 0)
break; // 流结束
// 告知管道已写入的字节数
writer.Advance(bytesRead);
// Flush操作让数据对读取端可见。
// 如果读取端速度较慢,FlushAsync会在此处暂停(背压机制)。
FlushResult result = await writer.FlushAsync(ct);
if (result.IsCompleted)
break; // 读取端停止消费
}
// 发送完成信号 -- 读取端会看到IsCompleted = true
await writer.CompleteAsync();
}关键规则:
- 写入前调用或
GetMemory-- 调用GetSpan后绝不能写入之前获取的缓冲区Advance - 调用时传入准确的已写字节数
Advance - 调用让数据对读取端可见并遵循背压机制
FlushAsync
PipeReader -- Consuming Data
PipeReader -- 消费数据
csharp
async Task ReadPipeAsync(PipeReader reader, CancellationToken ct)
{
while (true)
{
ReadResult result = await reader.ReadAsync(ct);
ReadOnlySequence<byte> buffer = result.Buffer;
// Try to parse messages from the buffer
while (TryParseMessage(ref buffer, out var message))
{
await ProcessMessageAsync(message, ct);
}
// Tell the pipe how much was consumed and how much was examined.
// consumed: data that has been fully processed (will be freed)
// examined: data that has been looked at (won't trigger re-read
// until new data arrives)
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break; // Writer finished and all data consumed
}
await reader.CompleteAsync();
}Critical rules:
- Always call after
AdvanceTo-- failing to do so leaks memoryReadAsync - Pass both and
consumedpositions:examinedfrees memory,consumedprevents busy-wait when the buffer has been scanned but does not contain a complete messageexamined - Never access after calling
ReadResult.Buffer-- the memory may be recycledAdvanceTo
csharp
async Task ReadPipeAsync(PipeReader reader, CancellationToken ct)
{
while (true)
{
ReadResult result = await reader.ReadAsync(ct);
ReadOnlySequence<byte> buffer = result.Buffer;
// 尝试从缓冲区解析消息
while (TryParseMessage(ref buffer, out var message))
{
await ProcessMessageAsync(message, ct);
}
// 告知管道已消费和已检查的数据量。
// consumed: 已完全处理的数据(将被释放)
// examined: 已检查的数据(在新数据到达前不会触发重新读取)
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break; // 写入端完成且所有数据已被消费
}
await reader.CompleteAsync();
}关键规则:
- 调用后必须调用
ReadAsync-- 否则会导致内存泄漏AdvanceTo - 同时传入和
consumed位置:examined释放内存,consumed避免缓冲区已扫描但无完整消息时的忙等待examined - 调用后绝不能访问
AdvanceTo-- 内存可能已被回收ReadResult.Buffer
Backpressure
背压处理
Backpressure prevents fast producers from overwhelming slow consumers. The pipe pauses the writer when unread data exceeds a threshold.
背压机制防止快速生产者压垮慢速消费者。当未读数据超过阈值时,管道会暂停写入端。
PipeOptions Configuration
PipeOptions配置
csharp
var pipe = new Pipe(new PipeOptions(
pauseWriterThreshold: 64 * 1024, // Pause writer at 64 KB buffered
resumeWriterThreshold: 32 * 1024, // Resume writer when buffer drops to 32 KB
minimumSegmentSize: 4096,
useSynchronizationContext: false));| Option | Default | Purpose |
|---|---|---|
| 65,536 | |
| 32,768 | |
| 4,096 | Minimum buffer segment allocation size |
| | Set |
csharp
var pipe = new Pipe(new PipeOptions(
pauseWriterThreshold: 64 * 1024, // 缓冲数据达到64 KB时暂停写入端
resumeWriterThreshold: 32 * 1024, // 缓冲数据降至32 KB时恢复写入端
minimumSegmentSize: 4096,
useSynchronizationContext: false));| 选项 | 默认值 | 用途 |
|---|---|---|
| 65,536 | 未读字节数超过此值时, |
| 32,768 | 未读字节数低于此值时, |
| 4,096 | 缓冲区段的最小分配大小 |
| | 服务器代码设为 |
How Backpressure Works
背压机制工作原理
- Writer calls after
FlushAsyncAdvance - If buffered (unread) data exceeds ,
PauseWriterThresholddoes not complete until the reader consumes enough data to drop belowFlushAsyncResumeWriterThreshold - The writer is effectively paused -- no busy-waiting, no exceptions, just an awaitable that completes when the reader catches up
This prevents unbounded memory growth when a producer (network socket, file) is faster than the consumer (parser, business logic).
- 写入端在后调用
AdvanceFlushAsync - 如果缓冲(未读)数据超过,
PauseWriterThreshold会暂停,直到读取端消费足够数据使缓冲降至FlushAsync以下ResumeWriterThreshold - 写入端会被有效暂停 -- 无忙等待,无异常,仅当读取端跟上进度时,可等待对象才会完成
这避免了生产者(网络套接字、文件)速度快于消费者(解析器、业务逻辑)时的无限制内存增长。
Protocol Parsing
协议解析
Pipelines excel at parsing binary protocols because handles fragmented data across multiple buffer segments without copying.
ReadOnlySequence<byte>Pipelines在解析二进制协议方面表现出色,因为无需复制即可处理跨多个缓冲区段的碎片化数据。
ReadOnlySequence<byte>Length-Prefixed Protocol Parser
长度前缀协议解析器
A common pattern: each message starts with a 4-byte big-endian length header followed by the payload.
csharp
static bool TryParseMessage(
ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> payload)
{
payload = default;
// Need at least 4 bytes for the length prefix
if (buffer.Length < 4)
return false;
// Read length from first 4 bytes
int length;
if (buffer.FirstSpan.Length >= 4)
{
length = BinaryPrimitives.ReadInt32BigEndian(buffer.FirstSpan);
}
else
{
// Slow path: length header spans multiple segments
Span<byte> lengthBytes = stackalloc byte[4];
buffer.Slice(0, 4).CopyTo(lengthBytes);
length = BinaryPrimitives.ReadInt32BigEndian(lengthBytes);
}
// Validate length to prevent allocation attacks
if (length < 0 || length > 1_048_576) // 1 MB max
throw new ProtocolViolationException(
$"Invalid message length: {length}");
// Check if the full message is available
long totalLength = 4 + length;
if (buffer.Length < totalLength)
return false;
// Extract the payload (zero-copy slice)
payload = buffer.Slice(4, length);
// Advance the buffer past this message
buffer = buffer.Slice(totalLength);
return true;
}常见模式:每条消息以4字节大端序长度头开头,后跟负载数据。
csharp
static bool TryParseMessage(
ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> payload)
{
payload = default;
// 至少需要4字节获取长度前缀
if (buffer.Length < 4)
return false;
// 从开头4字节读取长度
int length;
if (buffer.FirstSpan.Length >= 4)
{
length = BinaryPrimitives.ReadInt32BigEndian(buffer.FirstSpan);
}
else
{
// 慢路径:长度头跨多个段
Span<byte> lengthBytes = stackalloc byte[4];
buffer.Slice(0, 4).CopyTo(lengthBytes);
length = BinaryPrimitives.ReadInt32BigEndian(lengthBytes);
}
// 验证长度以防止分配攻击
if (length < 0 || length > 1_048_576) // 最大1 MB
throw new ProtocolViolationException(
$"无效消息长度: {length}");
// 检查完整消息是否可用
long totalLength = 4 + length;
if (buffer.Length < totalLength)
return false;
// 提取负载数据(零拷贝切片)
payload = buffer.Slice(4, length);
// 将缓冲区推进到当前消息之后
buffer = buffer.Slice(totalLength);
return true;
}Delimiter-Based Protocol Parser (Line Protocol)
基于分隔符的协议解析器(行协议)
csharp
static bool TryReadLine(
ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> line)
{
// Look for the newline delimiter
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position is null)
{
line = default;
return false;
}
// Slice up to (not including) the delimiter
line = buffer.Slice(0, position.Value);
// Advance past the delimiter
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}csharp
static bool TryReadLine(
ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> line)
{
// 查找换行分隔符
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position is null)
{
line = default;
return false;
}
// 切片到分隔符(不包含分隔符)
line = buffer.Slice(0, position.Value);
// 推进到分隔符之后
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}Working with ReadOnlySequence<byte>
使用ReadOnlySequence<byte>
ReadOnlySequence<byte>csharp
static string DecodeUtf8(ReadOnlySequence<byte> sequence)
{
// Fast path: single contiguous segment
if (sequence.IsSingleSegment)
{
return Encoding.UTF8.GetString(sequence.FirstSpan);
}
// Slow path: multi-segment -- rent a contiguous buffer
int length = (int)sequence.Length;
byte[] rented = ArrayPool<byte>.Shared.Rent(length);
try
{
sequence.CopyTo(rented);
return Encoding.UTF8.GetString(rented, 0, length);
}
finally
{
ArrayPool<byte>.Shared.Return(rented);
}
}ReadOnlySequence<byte>csharp
static string DecodeUtf8(ReadOnlySequence<byte> sequence)
{
// 快路径:单个连续段
if (sequence.IsSingleSegment)
{
return Encoding.UTF8.GetString(sequence.FirstSpan);
}
// 慢路径:多段 -- 租用连续缓冲区
int length = (int)sequence.Length;
byte[] rented = ArrayPool<byte>.Shared.Rent(length);
try
{
sequence.CopyTo(rented);
return Encoding.UTF8.GetString(rented, 0, length);
}
finally
{
ArrayPool<byte>.Shared.Return(rented);
}
}Stream Adapter
流适配器
Bridge with existing -based APIs using and .
System.IO.PipelinesStreamPipeReader.CreatePipeWriter.Createcsharp
// Wrap a NetworkStream for pipeline-based reading
await using var networkStream = tcpClient.GetStream();
var reader = PipeReader.Create(networkStream, new StreamPipeReaderOptions(
bufferSize: 4096,
minimumReadSize: 1024,
leaveOpen: true)); // Caller manages networkStream lifetime
try
{
await ProcessProtocolAsync(reader, cancellationToken);
}
finally
{
await reader.CompleteAsync();
}csharp
// Wrap a stream for pipeline-based writing
var writer = PipeWriter.Create(networkStream, new StreamPipeWriterOptions(
minimumBufferSize: 4096,
leaveOpen: true)); // Caller manages networkStream lifetime
try
{
await WriteResponseAsync(writer, response, cancellationToken);
}
finally
{
await writer.CompleteAsync();
}使用和将与现有基于的API桥接。
PipeReader.CreatePipeWriter.CreateSystem.IO.PipelinesStreamcsharp
// 包装NetworkStream以支持基于管道的读取
await using var networkStream = tcpClient.GetStream();
var reader = PipeReader.Create(networkStream, new StreamPipeReaderOptions(
bufferSize: 4096,
minimumReadSize: 1024,
leaveOpen: true)); // 调用方管理networkStream生命周期
try
{
await ProcessProtocolAsync(reader, cancellationToken);
}
finally
{
await reader.CompleteAsync();
}csharp
// 包装流以支持基于管道的写入
var writer = PipeWriter.Create(networkStream, new StreamPipeWriterOptions(
minimumBufferSize: 4096,
leaveOpen: true)); // 调用方管理networkStream生命周期
try
{
await WriteResponseAsync(writer, response, cancellationToken);
}
finally
{
await writer.CompleteAsync();
}Kestrel Integration
Kestrel集成
ASP.NET Core's Kestrel web server uses internally for HTTP request/response processing. Custom connection middleware can access the transport-level pipe directly.
System.IO.PipelinesASP.NET Core的Kestrel Web服务器内部使用处理HTTP请求/响应。自定义连接中间件可直接访问传输层管道。
System.IO.PipelinesConnection Middleware
连接中间件
csharp
// Custom connection middleware for protocol-level processing
builder.WebHost.ConfigureKestrel(options =>
{
options.ListenLocalhost(9000, listenOptions =>
{
listenOptions.UseConnectionHandler<MyProtocolHandler>();
});
});
public sealed class MyProtocolHandler : ConnectionHandler
{
public override async Task OnConnectedAsync(
ConnectionContext connection)
{
var reader = connection.Transport.Input;
var writer = connection.Transport.Output;
var ct = connection.ConnectionClosed;
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(ct);
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryParseMessage(ref buffer, out var payload))
{
var response = ProcessRequest(payload);
await WriteResponseAsync(writer, response);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break;
}
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
private static async Task WriteResponseAsync(
PipeWriter writer, ReadOnlyMemory<byte> response)
{
// Write length prefix + payload
var memory = writer.GetMemory(4 + response.Length);
BinaryPrimitives.WriteInt32BigEndian(
memory.Span, response.Length);
response.CopyTo(memory[4..]);
writer.Advance(4 + response.Length);
await writer.FlushAsync();
}
}csharp
// 用于协议层处理的自定义连接中间件
builder.WebHost.ConfigureKestrel(options =>
{
options.ListenLocalhost(9000, listenOptions =>
{
listenOptions.UseConnectionHandler<MyProtocolHandler>();
});
});
public sealed class MyProtocolHandler : ConnectionHandler
{
public override async Task OnConnectedAsync(
ConnectionContext connection)
{
var reader = connection.Transport.Input;
var writer = connection.Transport.Output;
var ct = connection.ConnectionClosed;
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(ct);
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryParseMessage(ref buffer, out var payload))
{
var response = ProcessRequest(payload);
await WriteResponseAsync(writer, response);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break;
}
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
private static async Task WriteResponseAsync(
PipeWriter writer, ReadOnlyMemory<byte> response)
{
// 写入长度前缀 + 负载
var memory = writer.GetMemory(4 + response.Length);
BinaryPrimitives.WriteInt32BigEndian(
memory.Span, response.Length);
response.CopyTo(memory[4..]);
writer.Advance(4 + response.Length);
await writer.FlushAsync();
}
}IDuplexPipe
IDuplexPipe
Kestrel exposes connections as , combining and into a single transport abstraction. This pattern also works for custom TCP servers, WebSocket handlers, and named-pipe protocols.
IDuplexPipePipeReaderPipeWritercsharp
public interface IDuplexPipe
{
PipeReader Input { get; }
PipeWriter Output { get; }
}Kestrel将连接暴露为,将和组合为单个传输抽象。此模式也适用于自定义TCP服务器、WebSocket处理程序和命名管道协议。
IDuplexPipePipeReaderPipeWritercsharp
public interface IDuplexPipe
{
PipeReader Input { get; }
PipeWriter Output { get; }
}Performance Tips
性能技巧
- Minimize copies -- use slicing instead of copying to
ReadOnlySequence<byte>. Parse directly from the sequence when possible.byte[] - Use /
GetSpancorrectly -- request the minimum size you need. The pipe may return a larger buffer, which is fine. Do not cache the returnedGetMemory/SpanacrossMemory/Advancecalls.FlushAsync - Set -- server code should never capture the synchronization context. This is the default for
useSynchronizationContext: falsebut explicit is clearer.PipeOptions - Tune pause/resume thresholds -- the defaults (64 KB / 32 KB) work for most scenarios. Increase for high-throughput bulk transfer; decrease for low-latency interactive protocols.
- Prefer -- for complex parsing,
SequenceReader<byte>providesSequenceReader<byte>,TryRead,TryReadBigEndian, andAdvancePastmethods that handle multi-segment sequences transparently.IsNext
csharp
static bool TryParseHeader(
ref ReadOnlySequence<byte> buffer,
out int messageType,
out int length)
{
var reader = new SequenceReader<byte>(buffer);
if (!reader.TryRead(out byte typeByte) ||
!reader.TryReadBigEndian(out int len))
{
messageType = 0;
length = 0;
return false;
}
messageType = typeByte;
length = len;
buffer = buffer.Slice(reader.Position);
return true;
}- 最小化复制 -- 使用切片而非复制到
ReadOnlySequence<byte>。尽可能直接从序列解析。byte[] - 正确使用/
GetSpan-- 请求所需的最小大小。管道可能返回更大的缓冲区,这没问题。不要在GetMemory/Advance调用之间缓存返回的FlushAsync/Span。Memory - 设置-- 服务器代码绝不能捕获同步上下文。这是
useSynchronizationContext: false的默认值,但显式设置更清晰。PipeOptions - 调整暂停/恢复阈值 -- 默认值(64 KB / 32 KB)适用于大多数场景。高吞吐量批量传输可增大阈值;低延迟交互协议可减小阈值。
- 优先使用-- 对于复杂解析,
SequenceReader<byte>提供SequenceReader<byte>、TryRead、TryReadBigEndian和AdvancePast方法,可透明处理多段序列。IsNext
csharp
static bool TryParseHeader(
ref ReadOnlySequence<byte> buffer,
out int messageType,
out int length)
{
var reader = new SequenceReader<byte>(buffer);
if (!reader.TryRead(out byte typeByte) ||
!reader.TryReadBigEndian(out int len))
{
messageType = 0;
length = 0;
return false;
}
messageType = typeByte;
length = len;
buffer = buffer.Slice(reader.Position);
return true;
}Agent Gotchas
常见陷阱
- Do not forget to call after
AdvanceTo-- skippingReadAsyncleaks pooled memory and eventually causesAdvanceTo. EveryOutOfMemoryExceptionmust be paired with anReadAsync.AdvanceTo - Do not access after calling
ReadResult.Buffer-- the underlying memory segments may be returned to the pool. Copy or parse all needed data before advancing.AdvanceTo - Do not set equal to
consumedwhen no complete message was found -- this creates a busy-wait loop. Setexaminedtoconsumed(nothing consumed) andbuffer.Starttoexamined(everything examined) so the pipe waits for new data.buffer.End - Do not ignore -- it means the reader has stopped consuming. Continue writing after this and data will be silently discarded.
FlushResult.IsCompleted - Do not use for simple stream-to-stream copying --
Pipeis simpler and equally efficient. Use pipelines when you need parsing, backpressure, or zero-copy slicing.Stream.CopyToAsync - Do not use methods on spans shorter than required -- always check
BinaryPrimitivesbefore reading fixed-width values to avoidbuffer.Length.ArgumentOutOfRangeException
- 不要忘记在后调用
ReadAsync-- 跳过AdvanceTo会导致池化内存泄漏,最终引发AdvanceTo。每个OutOfMemoryException必须对应一个ReadAsync。AdvanceTo - 调用后不要访问
AdvanceTo-- 底层内存段可能已返回池。在推进前复制或解析所有需要的数据。ReadResult.Buffer - 未找到完整消息时,不要将设为等于
consumed-- 这会创建忙等待循环。将examined设为consumed(无数据消费),buffer.Start设为examined(所有数据已检查),这样管道会等待新数据。buffer.End - 不要忽略-- 这表示读取端已停止消费。在此之后继续写入,数据会被静默丢弃。
FlushResult.IsCompleted - 不要使用进行简单的流到流复制 --
Pipe更简单且效率相当。当需要解析、背压或零拷贝切片时再使用Pipelines。Stream.CopyToAsync - 不要在长度不足的Span上使用方法 -- 读取固定宽度值前务必检查
BinaryPrimitives,避免buffer.Length。ArgumentOutOfRangeException
Knowledge Sources
知识来源
- Stephen Toub, System.IO.Pipelines: High performance IO in .NET -- canonical deep dive on pipeline design, motivation, and usage patterns
- Stephen Toub,System.IO.Pipelines: High performance IO in .NET -- 关于管道设计、动机和使用模式的权威深度解析