Loading...
Loading...
Builds high-perf network I/O. PipeReader/PipeWriter, backpressure, protocol parsers, Kestrel.
npx skill4agent add novotnyllc/dotnet-artisan dotnet-io-pipelinesSystem.IO.PipelinesPipeReaderPipeWriterStreamSystem.IO.Pipelines| Problem | Stream Approach | Pipeline Approach |
|---|---|---|
| Buffer management | Allocate | Automatic pooled buffer management |
| Partial reads | Track position, concatenate fragments | |
| Backpressure | None -- writer can outpace reader | Built-in pause/resume thresholds |
| Memory copies | Copy between buffers at each layer | Zero-copy slicing with |
| Lifetime management | Manual | Pooled memory returned on |
PipePipeWriterPipeReader// Create a pipe with default options (uses ArrayPool internally)
var pipe = new Pipe();
PipeWriter writer = pipe.Writer; // Producer side
PipeReader reader = pipe.Reader; // Consumer sideasync Task FillPipeAsync(Stream source, PipeWriter writer,
CancellationToken ct)
{
const int minimumBufferSize = 512;
while (true)
{
// Request a buffer from the pipe's memory pool
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await source.ReadAsync(memory, ct);
if (bytesRead == 0)
break; // End of stream
// Tell the pipe how many bytes were written
writer.Advance(bytesRead);
// Flush makes data available to the reader.
// FlushAsync may pause here if the reader is slow (backpressure).
FlushResult result = await writer.FlushAsync(ct);
if (result.IsCompleted)
break; // Reader stopped consuming
}
// Signal completion -- reader will see IsCompleted = true
await writer.CompleteAsync();
}GetMemoryGetSpanAdvanceAdvanceFlushAsyncasync Task ReadPipeAsync(PipeReader reader, CancellationToken ct)
{
while (true)
{
ReadResult result = await reader.ReadAsync(ct);
ReadOnlySequence<byte> buffer = result.Buffer;
// Try to parse messages from the buffer
while (TryParseMessage(ref buffer, out var message))
{
await ProcessMessageAsync(message, ct);
}
// Tell the pipe how much was consumed and how much was examined.
// consumed: data that has been fully processed (will be freed)
// examined: data that has been looked at (won't trigger re-read
// until new data arrives)
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break; // Writer finished and all data consumed
}
await reader.CompleteAsync();
}AdvanceToReadAsyncconsumedexaminedconsumedexaminedReadResult.BufferAdvanceTovar pipe = new Pipe(new PipeOptions(
pauseWriterThreshold: 64 * 1024, // Pause writer at 64 KB buffered
resumeWriterThreshold: 32 * 1024, // Resume writer when buffer drops to 32 KB
minimumSegmentSize: 4096,
useSynchronizationContext: false));| Option | Default | Purpose |
|---|---|---|
| 65,536 | |
| 32,768 | |
| 4,096 | Minimum buffer segment allocation size |
| | Set |
FlushAsyncAdvancePauseWriterThresholdFlushAsyncResumeWriterThresholdReadOnlySequence<byte>static bool TryParseMessage(
ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> payload)
{
payload = default;
// Need at least 4 bytes for the length prefix
if (buffer.Length < 4)
return false;
// Read length from first 4 bytes
int length;
if (buffer.FirstSpan.Length >= 4)
{
length = BinaryPrimitives.ReadInt32BigEndian(buffer.FirstSpan);
}
else
{
// Slow path: length header spans multiple segments
Span<byte> lengthBytes = stackalloc byte[4];
buffer.Slice(0, 4).CopyTo(lengthBytes);
length = BinaryPrimitives.ReadInt32BigEndian(lengthBytes);
}
// Validate length to prevent allocation attacks
if (length < 0 || length > 1_048_576) // 1 MB max
throw new ProtocolViolationException(
$"Invalid message length: {length}");
// Check if the full message is available
long totalLength = 4 + length;
if (buffer.Length < totalLength)
return false;
// Extract the payload (zero-copy slice)
payload = buffer.Slice(4, length);
// Advance the buffer past this message
buffer = buffer.Slice(totalLength);
return true;
}static bool TryReadLine(
ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> line)
{
// Look for the newline delimiter
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position is null)
{
line = default;
return false;
}
// Slice up to (not including) the delimiter
line = buffer.Slice(0, position.Value);
// Advance past the delimiter
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}ReadOnlySequence<byte>static string DecodeUtf8(ReadOnlySequence<byte> sequence)
{
// Fast path: single contiguous segment
if (sequence.IsSingleSegment)
{
return Encoding.UTF8.GetString(sequence.FirstSpan);
}
// Slow path: multi-segment -- rent a contiguous buffer
int length = (int)sequence.Length;
byte[] rented = ArrayPool<byte>.Shared.Rent(length);
try
{
sequence.CopyTo(rented);
return Encoding.UTF8.GetString(rented, 0, length);
}
finally
{
ArrayPool<byte>.Shared.Return(rented);
}
}System.IO.PipelinesStreamPipeReader.CreatePipeWriter.Create// Wrap a NetworkStream for pipeline-based reading
await using var networkStream = tcpClient.GetStream();
var reader = PipeReader.Create(networkStream, new StreamPipeReaderOptions(
bufferSize: 4096,
minimumReadSize: 1024,
leaveOpen: true)); // Caller manages networkStream lifetime
try
{
await ProcessProtocolAsync(reader, cancellationToken);
}
finally
{
await reader.CompleteAsync();
}// Wrap a stream for pipeline-based writing
var writer = PipeWriter.Create(networkStream, new StreamPipeWriterOptions(
minimumBufferSize: 4096,
leaveOpen: true)); // Caller manages networkStream lifetime
try
{
await WriteResponseAsync(writer, response, cancellationToken);
}
finally
{
await writer.CompleteAsync();
}System.IO.Pipelines// Custom connection middleware for protocol-level processing
builder.WebHost.ConfigureKestrel(options =>
{
options.ListenLocalhost(9000, listenOptions =>
{
listenOptions.UseConnectionHandler<MyProtocolHandler>();
});
});
public sealed class MyProtocolHandler : ConnectionHandler
{
public override async Task OnConnectedAsync(
ConnectionContext connection)
{
var reader = connection.Transport.Input;
var writer = connection.Transport.Output;
var ct = connection.ConnectionClosed;
try
{
while (true)
{
ReadResult result = await reader.ReadAsync(ct);
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryParseMessage(ref buffer, out var payload))
{
var response = ProcessRequest(payload);
await WriteResponseAsync(writer, response);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break;
}
}
finally
{
await reader.CompleteAsync();
await writer.CompleteAsync();
}
}
private static async Task WriteResponseAsync(
PipeWriter writer, ReadOnlyMemory<byte> response)
{
// Write length prefix + payload
var memory = writer.GetMemory(4 + response.Length);
BinaryPrimitives.WriteInt32BigEndian(
memory.Span, response.Length);
response.CopyTo(memory[4..]);
writer.Advance(4 + response.Length);
await writer.FlushAsync();
}
}IDuplexPipePipeReaderPipeWriterpublic interface IDuplexPipe
{
PipeReader Input { get; }
PipeWriter Output { get; }
}ReadOnlySequence<byte>byte[]GetSpanGetMemorySpanMemoryAdvanceFlushAsyncuseSynchronizationContext: falsePipeOptionsSequenceReader<byte>SequenceReader<byte>TryReadTryReadBigEndianAdvancePastIsNextstatic bool TryParseHeader(
ref ReadOnlySequence<byte> buffer,
out int messageType,
out int length)
{
var reader = new SequenceReader<byte>(buffer);
if (!reader.TryRead(out byte typeByte) ||
!reader.TryReadBigEndian(out int len))
{
messageType = 0;
length = 0;
return false;
}
messageType = typeByte;
length = len;
buffer = buffer.Slice(reader.Position);
return true;
}AdvanceToReadAsyncAdvanceToOutOfMemoryExceptionReadAsyncAdvanceToReadResult.BufferAdvanceToconsumedexaminedconsumedbuffer.Startexaminedbuffer.EndFlushResult.IsCompletedPipeStream.CopyToAsyncBinaryPrimitivesbuffer.LengthArgumentOutOfRangeException