grpc-microservices
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesegRPC Microservices
gRPC 微服务
A comprehensive skill for building high-performance, type-safe microservices using gRPC and Protocol Buffers. This skill covers service design, all streaming patterns, interceptors, load balancing, error handling, and production deployment patterns for distributed systems.
这是一项使用gRPC和Protocol Buffers构建高性能、类型安全微服务的全面技能。本技能涵盖分布式系统的服务设计、所有流模式、拦截器、负载均衡、错误处理以及生产部署模式。
When to Use This Skill
何时使用本技能
Use this skill when:
- Building microservices that require high-performance, low-latency communication
- Implementing real-time data streaming between services
- Designing type-safe APIs with strong contracts using Protocol Buffers
- Creating polyglot systems where services are written in different languages
- Building distributed systems requiring bidirectional streaming
- Implementing service meshes with advanced routing and observability
- Designing APIs that need to evolve with backward/forward compatibility
- Creating internal APIs where performance and type safety are critical
- Building event-driven architectures with streaming data pipelines
- Implementing client-server systems with push capabilities (server streaming)
- Designing systems requiring efficient binary serialization
- Building microservices requiring automatic code generation for multiple languages
在以下场景中使用本技能:
- 构建需要高性能、低延迟通信的微服务
- 实现服务间的实时数据流
- 使用Protocol Buffers设计具有强契约的类型安全API
- 创建由不同语言编写服务的多语言系统
- 构建需要双向流的分布式系统
- 实现具备高级路由和可观测性的服务网格
- 设计需要向后/向前兼容演进的API
- 构建对性能和类型安全要求严格的内部API
- 构建带有流数据管道的事件驱动架构
- 实现具备推送能力的客户端-服务器系统(服务器流)
- 设计需要高效二进制序列化的系统
- 构建需要为多语言自动生成代码的微服务
Core Concepts
核心概念
gRPC Fundamentals
gRPC 基础
gRPC is a modern open-source RPC framework that can run anywhere. It enables client and server applications to communicate transparently and makes it easier to build connected systems.
Key Characteristics:
- HTTP/2 based: Multiplexing, server push, header compression
- Protocol Buffers: Efficient binary serialization format
- Streaming: Bidirectional streaming support built-in
- Code Generation: Auto-generate client/server code in 10+ languages
- Deadlines/Timeouts: First-class timeout support
- Cancellation: Propagate cancellation across services
- Interceptors: Middleware pattern for cross-cutting concerns
gRPC是一个现代的开源RPC框架,可在任何环境中运行。它使客户端和服务器应用能够透明地通信,简化互联系统的构建。
关键特性:
- 基于HTTP/2:多路复用、服务器推送、头部压缩
- Protocol Buffers:高效的二进制序列化格式
- 流支持:内置双向流功能
- 代码生成:自动为10+种语言生成客户端/服务器代码
- 截止时间/超时:原生支持超时设置
- 取消机制:跨服务传播取消信号
- 拦截器:用于横切关注点的中间件模式
Protocol Buffers (protobuf)
Protocol Buffers(protobuf)
Protocol Buffers is a language-neutral, platform-neutral extensible mechanism for serializing structured data.
Advantages:
- Compact: 3-10x smaller than JSON
- Fast: 20-100x faster to serialize/deserialize than JSON
- Type-safe: Strongly typed schema with validation
- Backward/Forward Compatible: Evolve schemas safely
- Language Support: Official support for 10+ languages
- Self-documenting: Schema serves as documentation
Basic Syntax:
protobuf
syntax = "proto3";
message User {
int32 id = 1;
string name = 2;
string email = 3;
}Protocol Buffers是一种与语言、平台无关的可扩展结构化数据序列化机制。
优势:
- 紧凑:比JSON小3-10倍
- 快速:序列化/反序列化速度比JSON快20-100倍
- 类型安全:带有验证的强类型schema
- 向后/向前兼容:安全演进schema
- 多语言支持:官方支持10+种语言
- 自文档化:schema可作为文档使用
基础语法:
protobuf
syntax = "proto3";
message User {
int32 id = 1;
string name = 2;
string email = 3;
}Service Definitions
服务定义
gRPC services are defined in files and specify available methods and their input/output types.
.protoBasic Service:
protobuf
service UserService {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
}gRPC服务在文件中定义,指定可用方法及其输入/输出类型。
.proto基础服务:
protobuf
service UserService {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
}Four Types of RPC Methods
四种RPC方法类型
1. Unary RPC (Request-Response)
1. 一元RPC(请求-响应)
Simple request-response pattern, like a traditional REST API call.
protobuf
rpc GetUser(GetUserRequest) returns (GetUserResponse);Use Cases:
- CRUD operations
- Simple queries
- Synchronous operations
- Traditional request-response patterns
简单的请求-响应模式,类似传统REST API调用。
protobuf
rpc GetUser(GetUserRequest) returns (GetUserResponse);适用场景:
- CRUD操作
- 简单查询
- 同步操作
- 传统请求-响应模式
2. Server Streaming RPC
2. 服务器流RPC
Client sends one request, server returns a stream of responses.
protobuf
rpc ListUsers(ListUsersRequest) returns (stream User);Use Cases:
- Paginated results
- Real-time updates
- Server-side event push
- Large dataset downloads
客户端发送一个请求,服务器返回响应流。
protobuf
rpc ListUsers(ListUsersRequest) returns (stream User);适用场景:
- 分页结果
- 实时更新
- 服务器端事件推送
- 大型数据集下载
3. Client Streaming RPC
3. 客户端流RPC
Client sends a stream of requests, server returns one response.
protobuf
rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);Use Cases:
- Bulk uploads
- Batch processing
- Client-side aggregation
- File uploads in chunks
客户端发送请求流,服务器返回一个响应。
protobuf
rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);适用场景:
- 批量上传
- 批处理
- 客户端聚合
- 分块文件上传
4. Bidirectional Streaming RPC
4. 双向流RPC
Both client and server send streams of messages independently.
protobuf
rpc Chat(stream ChatMessage) returns (stream ChatMessage);Use Cases:
- Real-time chat applications
- Live collaboration
- Gaming (real-time state sync)
- IoT bidirectional communication
客户端和服务器独立发送消息流。
protobuf
rpc Chat(stream ChatMessage) returns (stream ChatMessage);适用场景:
- 实时聊天应用
- 在线协作
- 游戏(实时状态同步)
- IoT双向通信
Protobuf Schema Design
Protobuf Schema设计
Message Design Best Practices
消息设计最佳实践
1. Use Explicit Field Numbers
Field numbers are critical for backward compatibility and should never be reused.
protobuf
message User {
int32 id = 1; // Never change this number
string name = 2; // Never change this number
string email = 3; // Never change this number
// int32 age = 4; // DEPRECATED - don't reuse 4
string phone = 5; // New field - use next available
}2. Use Enumerations for Fixed Sets
protobuf
enum UserRole {
USER_ROLE_UNSPECIFIED = 0; // Always have a zero value
USER_ROLE_ADMIN = 1;
USER_ROLE_MODERATOR = 2;
USER_ROLE_MEMBER = 3;
}
message User {
int32 id = 1;
string name = 2;
UserRole role = 3;
}3. Use Nested Messages for Complex Types
protobuf
message User {
int32 id = 1;
string name = 2;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}
Address address = 3;
repeated Address additional_addresses = 4;
}4. Use for Arrays
repeatedprotobuf
message UserList {
repeated User users = 1;
}
message User {
int32 id = 1;
string name = 2;
repeated string tags = 3;
}5. Use for Union Types
oneofprotobuf
message SearchRequest {
string query = 1;
oneof filter {
string category = 2;
int32 user_id = 3;
string tag = 4;
}
}6. Use Well-Known Types
google.protobufprotobuf
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
message Event {
string id = 1;
string name = 2;
google.protobuf.Timestamp created_at = 3;
google.protobuf.Duration duration = 4;
google.protobuf.Int32Value optional_count = 5;
}1. 使用明确的字段编号
字段编号对向后兼容性至关重要,绝不能重复使用。
protobuf
message User {
int32 id = 1; // 永远不要更改此编号
string name = 2; // 永远不要更改此编号
string email = 3; // 永远不要更改此编号
// int32 age = 4; // 已废弃 - 不要复用编号4
string phone = 5; // 新字段 - 使用下一个可用编号
}2. 对固定集合使用枚举
protobuf
enum UserRole {
USER_ROLE_UNSPECIFIED = 0; // 始终保留零值
USER_ROLE_ADMIN = 1;
USER_ROLE_MODERATOR = 2;
USER_ROLE_MEMBER = 3;
}
message User {
int32 id = 1;
string name = 2;
UserRole role = 3;
}3. 对复杂类型使用嵌套消息
protobuf
message User {
int32 id = 1;
string name = 2;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}
Address address = 3;
repeated Address additional_addresses = 4;
}4. 使用表示数组
repeatedprotobuf
message UserList {
repeated User users = 1;
}
message User {
int32 id = 1;
string name = 2;
repeated string tags = 3;
}5. 使用表示联合类型
oneofprotobuf
message SearchRequest {
string query = 1;
oneof filter {
string category = 2;
int32 user_id = 3;
string tag = 4;
}
}6. 使用知名类型
google.protobufprotobuf
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
message Event {
string id = 1;
string name = 2;
google.protobuf.Timestamp created_at = 3;
google.protobuf.Duration duration = 4;
google.protobuf.Int32Value optional_count = 5;
}Service Design Patterns
服务设计模式
1. Resource-Oriented Design
Follow RESTful principles adapted for RPC:
protobuf
service UserService {
// Get single resource
rpc GetUser(GetUserRequest) returns (User);
// List resources
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
// Create resource
rpc CreateUser(CreateUserRequest) returns (User);
// Update resource
rpc UpdateUser(UpdateUserRequest) returns (User);
// Delete resource
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
}2. Pagination Pattern
protobuf
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
int32 total_count = 3;
}3. Batch Operations Pattern
protobuf
message BatchGetUsersRequest {
repeated int32 user_ids = 1;
}
message BatchGetUsersResponse {
map<int32, User> users = 1;
repeated int32 not_found = 2;
}4. Long-Running Operations Pattern
protobuf
import "google/longrunning/operations.proto";
service BatchJobService {
rpc ProcessBatch(BatchRequest) returns (google.longrunning.Operation);
rpc GetOperation(GetOperationRequest) returns (google.longrunning.Operation);
}1. 面向资源的设计
适配RPC的RESTful原则:
protobuf
service UserService {
// 获取单个资源
rpc GetUser(GetUserRequest) returns (User);
// 列出资源
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
// 创建资源
rpc CreateUser(CreateUserRequest) returns (User);
// 更新资源
rpc UpdateUser(UpdateUserRequest) returns (User);
// 删除资源
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
}2. 分页模式
protobuf
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
int32 total_count = 3;
}3. 批量操作模式
protobuf
message BatchGetUsersRequest {
repeated int32 user_ids = 1;
}
message BatchGetUsersResponse {
map<int32, User> users = 1;
repeated int32 not_found = 2;
}4. 长时运行操作模式
protobuf
import "google/longrunning/operations.proto";
service BatchJobService {
rpc ProcessBatch(BatchRequest) returns (google.longrunning.Operation);
rpc GetOperation(GetOperationRequest) returns (google.longrunning.Operation);
}Streaming Patterns
流模式
Server Streaming Patterns
服务器流模式
1. Pagination Streaming
Stream large result sets efficiently:
protobuf
service ProductService {
rpc SearchProducts(SearchRequest) returns (stream Product);
}
message SearchRequest {
string query = 1;
int32 limit = 2;
}Implementation (Go):
go
func (s *server) SearchProducts(req *pb.SearchRequest, stream pb.ProductService_SearchProductsServer) error {
products := s.db.Search(req.Query, req.Limit)
for _, product := range products {
if err := stream.Send(&product); err != nil {
return err
}
}
return nil
}2. Real-Time Updates
Push updates to clients as they occur:
protobuf
service EventService {
rpc SubscribeToEvents(SubscribeRequest) returns (stream Event);
}
message SubscribeRequest {
repeated string event_types = 1;
google.protobuf.Timestamp since = 2;
}3. Log Tailing
Stream logs or audit trails:
protobuf
service LogService {
rpc TailLogs(TailRequest) returns (stream LogEntry);
}
message TailRequest {
string service_name = 1;
string level = 2;
int32 lines = 3;
}1. 分页流
高效流式传输大型结果集:
protobuf
service ProductService {
rpc SearchProducts(SearchRequest) returns (stream Product);
}
message SearchRequest {
string query = 1;
int32 limit = 2;
}Go实现:
go
func (s *server) SearchProducts(req *pb.SearchRequest, stream pb.ProductService_SearchProductsServer) error {
products := s.db.Search(req.Query, req.Limit)
for _, product := range products {
if err := stream.Send(&product); err != nil {
return err
}
}
return nil
}2. 实时更新
事件发生时向客户端推送更新:
protobuf
service EventService {
rpc SubscribeToEvents(SubscribeRequest) returns (stream Event);
}
message SubscribeRequest {
repeated string event_types = 1;
google.protobuf.Timestamp since = 2;
}3. 日志追踪
流式传输日志或审计轨迹:
protobuf
service LogService {
rpc TailLogs(TailRequest) returns (stream LogEntry);
}
message TailRequest {
string service_name = 1;
string level = 2;
int32 lines = 3;
}Client Streaming Patterns
客户端流模式
1. Bulk Upload
Client streams data, server processes and returns summary:
protobuf
service UploadService {
rpc UploadImages(stream ImageChunk) returns (UploadSummary);
}
message ImageChunk {
string filename = 1;
bytes data = 2;
int32 chunk_number = 3;
}
message UploadSummary {
int32 total_images = 1;
int64 total_bytes = 2;
repeated string uploaded_filenames = 3;
}Implementation (Go):
go
func (s *server) UploadImages(stream pb.UploadService_UploadImagesServer) error {
var count int32
var totalBytes int64
var filenames []string
for {
chunk, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.UploadSummary{
TotalImages: count,
TotalBytes: totalBytes,
UploadedFilenames: filenames,
})
}
if err != nil {
return err
}
// Process chunk
totalBytes += int64(len(chunk.Data))
if chunk.ChunkNumber == 0 {
count++
filenames = append(filenames, chunk.Filename)
}
}
}2. Aggregation
Client sends multiple data points, server aggregates:
protobuf
service AnalyticsService {
rpc RecordMetrics(stream Metric) returns (AggregateResult);
}
message Metric {
string name = 1;
double value = 2;
google.protobuf.Timestamp timestamp = 3;
}1. 批量上传
客户端流式传输数据,服务器处理并返回摘要:
protobuf
service UploadService {
rpc UploadImages(stream ImageChunk) returns (UploadSummary);
}
message ImageChunk {
string filename = 1;
bytes data = 2;
int32 chunk_number = 3;
}
message UploadSummary {
int32 total_images = 1;
int64 total_bytes = 2;
repeated string uploaded_filenames = 3;
}Go实现:
go
func (s *server) UploadImages(stream pb.UploadService_UploadImagesServer) error {
var count int32
var totalBytes int64
var filenames []string
for {
chunk, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.UploadSummary{
TotalImages: count,
TotalBytes: totalBytes,
UploadedFilenames: filenames,
})
}
if err != nil {
return err
}
// 处理数据块
totalBytes += int64(len(chunk.Data))
if chunk.ChunkNumber == 0 {
count++
filenames = append(filenames, chunk.Filename)
}
}
}2. 聚合操作
客户端发送多个数据点,服务器进行聚合:
protobuf
service AnalyticsService {
rpc RecordMetrics(stream Metric) returns (AggregateResult);
}
message Metric {
string name = 1;
double value = 2;
google.protobuf.Timestamp timestamp = 3;
}Bidirectional Streaming Patterns
双向流模式
1. Chat Application
Real-time bidirectional communication:
protobuf
service ChatService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user_id = 1;
string room_id = 2;
string content = 3;
google.protobuf.Timestamp timestamp = 4;
}Implementation (Go):
go
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
// Create channel for this client
clientID := uuid.New().String()
msgChan := make(chan *pb.ChatMessage, 10)
// Register client
s.mu.Lock()
s.clients[clientID] = msgChan
s.mu.Unlock()
defer func() {
s.mu.Lock()
delete(s.clients, clientID)
close(msgChan)
s.mu.Unlock()
}()
// Goroutine to send messages to client
go func() {
for msg := range msgChan {
if err := stream.Send(msg); err != nil {
return
}
}
}()
// Receive messages from client
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// Broadcast to all clients in room
s.broadcast(msg)
}
}2. Live Collaboration
Real-time document editing:
protobuf
service CollaborationService {
rpc Collaborate(stream DocumentEdit) returns (stream DocumentEdit);
}
message DocumentEdit {
string document_id = 1;
string user_id = 2;
int32 position = 3;
string content = 4;
enum Operation {
OPERATION_UNSPECIFIED = 0;
OPERATION_INSERT = 1;
OPERATION_DELETE = 2;
OPERATION_UPDATE = 3;
}
Operation operation = 5;
}3. Game State Synchronization
Real-time multiplayer game updates:
protobuf
service GameService {
rpc PlayGame(stream GameAction) returns (stream GameState);
}
message GameAction {
string player_id = 1;
string game_id = 2;
string action_type = 3;
bytes action_data = 4;
}
message GameState {
string game_id = 1;
repeated PlayerState players = 2;
bytes world_state = 3;
google.protobuf.Timestamp timestamp = 4;
}1. 聊天应用
实时双向通信:
protobuf
service ChatService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user_id = 1;
string room_id = 2;
string content = 3;
google.protobuf.Timestamp timestamp = 4;
}Go实现:
go
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
// 为此客户端创建通道
clientID := uuid.New().String()
msgChan := make(chan *pb.ChatMessage, 10)
// 注册客户端
s.mu.Lock()
s.clients[clientID] = msgChan
s.mu.Unlock()
defer func() {
s.mu.Lock()
delete(s.clients, clientID)
close(msgChan)
s.mu.Unlock()
}()
// 向客户端发送消息的协程
go func() {
for msg := range msgChan {
if err := stream.Send(msg); err != nil {
return
}
}
}()
// 接收客户端消息
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 向房间内所有客户端广播
s.broadcast(msg)
}
}2. 在线协作
实时文档编辑:
protobuf
service CollaborationService {
rpc Collaborate(stream DocumentEdit) returns (stream DocumentEdit);
}
message DocumentEdit {
string document_id = 1;
string user_id = 2;
int32 position = 3;
string content = 4;
enum Operation {
OPERATION_UNSPECIFIED = 0;
OPERATION_INSERT = 1;
OPERATION_DELETE = 2;
OPERATION_UPDATE = 3;
}
Operation operation = 5;
}3. 游戏状态同步
实时多人游戏更新:
protobuf
service GameService {
rpc PlayGame(stream GameAction) returns (stream GameState);
}
message GameAction {
string player_id = 1;
string game_id = 2;
string action_type = 3;
bytes action_data = 4;
}
message GameState {
string game_id = 1;
repeated PlayerState players = 2;
bytes world_state = 3;
google.protobuf.Timestamp timestamp = 4;
}Interceptors (Middleware)
拦截器(中间件)
Interceptors provide a way to add cross-cutting concerns to gRPC services.
拦截器为gRPC服务提供了添加横切关注点的方式。
Unary Interceptors
一元拦截器
Server-side Unary Interceptor:
go
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Pre-processing
start := time.Now()
log.Printf("Method: %s, Start: %v", info.FullMethod, start)
// Call the handler
resp, err := handler(ctx, req)
// Post-processing
duration := time.Since(start)
log.Printf("Method: %s, Duration: %v, Error: %v",
info.FullMethod, duration, err)
return resp, err
}
}
// Usage
server := grpc.NewServer(
grpc.UnaryInterceptor(UnaryServerInterceptor()),
)Client-side Unary Interceptor:
go
func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
// Call the remote method
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("Method: %s, Duration: %v, Error: %v",
method, time.Since(start), err)
return err
}
}
// Usage
conn, err := grpc.Dial(
address,
grpc.WithUnaryInterceptor(UnaryClientInterceptor()),
)服务器端一元拦截器:
go
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 预处理
start := time.Now()
log.Printf("Method: %s, Start: %v", info.FullMethod, start)
// 调用处理函数
resp, err := handler(ctx, req)
// 后处理
duration := time.Since(start)
log.Printf("Method: %s, Duration: %v, Error: %v",
info.FullMethod, duration, err)
return resp, err
}
}
// 使用方式
server := grpc.NewServer(
grpc.UnaryInterceptor(UnaryServerInterceptor()),
)客户端一元拦截器:
go
func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
// 调用远程方法
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("Method: %s, Duration: %v, Error: %v",
method, time.Since(start), err)
return err
}
}
// 使用方式
conn, err := grpc.Dial(
address,
grpc.WithUnaryInterceptor(UnaryClientInterceptor()),
)Streaming Interceptors
流拦截器
Server-side Stream Interceptor:
go
func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
log.Printf("Stream started: %s", info.FullMethod)
err := handler(srv, ss)
log.Printf("Stream ended: %s, Error: %v", info.FullMethod, err)
return err
}
}服务器端流拦截器:
go
func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
log.Printf("Stream started: %s", info.FullMethod)
err := handler(srv, ss)
log.Printf("Stream ended: %s, Error: %v", info.FullMethod, err)
return err
}
}Common Interceptor Patterns
常见拦截器模式
1. Authentication Interceptor
1. 认证拦截器
go
func AuthInterceptor(secret string) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Extract metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no metadata")
}
// Get authorization token
tokens := md["authorization"]
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "no token")
}
// Validate token
token := tokens[0]
claims, err := validateJWT(token, secret)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
// Add claims to context
ctx = context.WithValue(ctx, "claims", claims)
return handler(ctx, req)
}
}go
func AuthInterceptor(secret string) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 提取元数据
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no metadata")
}
// 获取授权令牌
tokens := md["authorization"]
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "no token")
}
// 验证令牌
token := tokens[0]
claims, err := validateJWT(token, secret)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
// 将claims添加到上下文
ctx = context.WithValue(ctx, "claims", claims)
return handler(ctx, req)
}
}2. Logging Interceptor
2. 日志拦截器
go
func LoggingInterceptor(logger *log.Logger) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
// Get request ID from metadata
requestID := getRequestID(ctx)
logger.Printf("[%s] Request: %s", requestID, info.FullMethod)
resp, err := handler(ctx, req)
duration := time.Since(start)
statusCode := status.Code(err)
logger.Printf("[%s] Response: %s, Duration: %v, Status: %v",
requestID, info.FullMethod, duration, statusCode)
return resp, err
}
}go
func LoggingInterceptor(logger *log.Logger) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
// 从元数据中获取请求ID
requestID := getRequestID(ctx)
logger.Printf("[%s] Request: %s", requestID, info.FullMethod)
resp, err := handler(ctx, req)
duration := time.Since(start)
statusCode := status.Code(err)
logger.Printf("[%s] Response: %s, Duration: %v, Status: %v",
requestID, info.FullMethod, duration, statusCode)
return resp, err
}
}3. Rate Limiting Interceptor
3. 限流拦截器
go
func RateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
if !limiter.Allow() {
return nil, status.Error(
codes.ResourceExhausted,
"rate limit exceeded",
)
}
return handler(ctx, req)
}
}go
func RateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
if !limiter.Allow() {
return nil, status.Error(
codes.ResourceExhausted,
"rate limit exceeded",
)
}
return handler(ctx, req)
}
}4. Tracing Interceptor (OpenTelemetry)
4. 追踪拦截器(OpenTelemetry)
go
func TracingInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
ctx, span := tracer.Start(ctx, info.FullMethod)
defer span.End()
// Add attributes
span.SetAttributes(
attribute.String("rpc.method", info.FullMethod),
attribute.String("rpc.service", "MyService"),
)
resp, err := handler(ctx, req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes2.Error, err.Error())
} else {
span.SetStatus(codes2.Ok, "")
}
return resp, err
}
}go
func TracingInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
ctx, span := tracer.Start(ctx, info.FullMethod)
defer span.End()
// 添加属性
span.SetAttributes(
attribute.String("rpc.method", info.FullMethod),
attribute.String("rpc.service", "MyService"),
)
resp, err := handler(ctx, req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes2.Error, err.Error())
} else {
span.SetStatus(codes2.Ok, "")
}
return resp, err
}
}5. Error Recovery Interceptor
5. 错误恢复拦截器
go
func RecoveryInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered: %v\n%s", r, debug.Stack())
err = status.Error(codes.Internal, "internal server error")
}
}()
return handler(ctx, req)
}
}go
func RecoveryInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered: %v\n%s", r, debug.Stack())
err = status.Error(codes.Internal, "internal server error")
}
}()
return handler(ctx, req)
}
}Chaining Multiple Interceptors
链式调用多个拦截器
go
server := grpc.NewServer(
grpc.ChainUnaryInterceptor(
RecoveryInterceptor(),
LoggingInterceptor(logger),
TracingInterceptor(tracer),
AuthInterceptor(jwtSecret),
RateLimitInterceptor(limiter),
),
grpc.ChainStreamInterceptor(
StreamRecoveryInterceptor(),
StreamLoggingInterceptor(logger),
),
)go
server := grpc.NewServer(
grpc.ChainUnaryInterceptor(
RecoveryInterceptor(),
LoggingInterceptor(logger),
TracingInterceptor(tracer),
AuthInterceptor(jwtSecret),
RateLimitInterceptor(limiter),
),
grpc.ChainStreamInterceptor(
StreamRecoveryInterceptor(),
StreamLoggingInterceptor(logger),
),
)Load Balancing
负载均衡
Client-Side Load Balancing
客户端负载均衡
gRPC provides built-in client-side load balancing with multiple policies.
1. Round Robin
go
import "google.golang.org/grpc/balancer/roundrobin"
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
grpc.WithInsecure(),
)2. Pick First (Default)
go
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`),
grpc.WithInsecure(),
)3. Custom Resolver
Implement custom service discovery:
go
type exampleResolver struct {
target resolver.Target
cc resolver.ClientConn
addrsStore map[string][]string
}
func (r *exampleResolver) ResolveNow(resolver.ResolveNowOptions) {
// Discover service addresses
addresses := r.discoverServices()
var addrs []resolver.Address
for _, addr := range addresses {
addrs = append(addrs, resolver.Address{Addr: addr})
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func init() {
resolver.Register(&exampleResolverBuilder{})
}gRPC提供内置的客户端负载均衡,支持多种策略。
1. 轮询
go
import "google.golang.org/grpc/balancer/roundrobin"
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
grpc.WithInsecure(),
)2. 优先选择(默认)
go
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`),
grpc.WithInsecure(),
)3. 自定义解析器
实现自定义服务发现:
go
type exampleResolver struct {
target resolver.Target
cc resolver.ClientConn
addrsStore map[string][]string
}
func (r *exampleResolver) ResolveNow(resolver.ResolveNowOptions) {
// 发现服务地址
addresses := r.discoverServices()
var addrs []resolver.Address
for _, addr := range addresses {
addrs = append(addrs, resolver.Address{Addr: addr})
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func init() {
resolver.Register(&exampleResolverBuilder{})
}Load Balancing with Service Mesh
服务网格负载均衡
Kubernetes with Service Mesh (Istio/Linkerd):
yaml
apiVersion: v1
kind: Service
metadata:
name: grpc-service
spec:
selector:
app: grpc-app
ports:
- name: grpc
port: 50051
targetPort: 50051
protocol: TCP
type: ClusterIP
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: grpc-service
spec:
host: grpc-service
trafficPolicy:
loadBalancer:
simple: ROUND_ROBIN
connectionPool:
http:
http2MaxRequests: 1000
maxRequestsPerConnection: 10Kubernetes + 服务网格(Istio/Linkerd):
yaml
apiVersion: v1
kind: Service
metadata:
name: grpc-service
spec:
selector:
app: grpc-app
ports:
- name: grpc
port: 50051
targetPort: 50051
protocol: TCP
type: ClusterIP
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: grpc-service
spec:
host: grpc-service
trafficPolicy:
loadBalancer:
simple: ROUND_ROBIN
connectionPool:
http:
http2MaxRequests: 1000
maxRequestsPerConnection: 10Health Checking
健康检查
Implement health check service:
protobuf
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}Implementation:
go
import "google.golang.org/grpc/health"
import healthpb "google.golang.org/grpc/health/grpc_health_v1"
healthServer := health.NewServer()
healthpb.RegisterHealthServer(grpcServer, healthServer)
// Set service status
healthServer.SetServingStatus("UserService", healthpb.HealthCheckResponse_SERVING)实现健康检查服务:
protobuf
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}实现:
go
import "google.golang.org/grpc/health"
import healthpb "google.golang.org/grpc/health/grpc_health_v1"
healthServer := health.NewServer()
healthpb.RegisterHealthServer(grpcServer, healthServer)
// 设置服务状态
healthServer.SetServingStatus("UserService", healthpb.HealthCheckResponse_SERVING)Error Handling
错误处理
gRPC Status Codes
gRPC状态码
gRPC uses standardized status codes for error handling:
go
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"
// Return errors with appropriate codes
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
if req.Id <= 0 {
return nil, status.Error(codes.InvalidArgument, "id must be positive")
}
user, err := s.db.GetUser(req.Id)
if err == sql.ErrNoRows {
return nil, status.Error(codes.NotFound, "user not found")
}
if err != nil {
return nil, status.Error(codes.Internal, "database error")
}
return user, nil
}Common Status Codes:
- : Success
OK - : Operation was cancelled
Canceled - : Unknown error
Unknown - : Client specified invalid argument
InvalidArgument - : Deadline expired before operation
DeadlineExceeded - : Entity not found
NotFound - : Entity already exists
AlreadyExists - : Permission denied
PermissionDenied - : Resource exhausted (rate limit)
ResourceExhausted - : Operation rejected (system not in valid state)
FailedPrecondition - : Operation aborted
Aborted - : Out of valid range
OutOfRange - : Operation not implemented
Unimplemented - : Internal server error
Internal - : Service unavailable
Unavailable - : Unrecoverable data loss
DataLoss - : Request lacks valid authentication
Unauthenticated
gRPC使用标准化状态码进行错误处理:
go
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"
// 返回带有合适状态码的错误
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
if req.Id <= 0 {
return nil, status.Error(codes.InvalidArgument, "id must be positive")
}
user, err := s.db.GetUser(req.Id)
if err == sql.ErrNoRows {
return nil, status.Error(codes.NotFound, "user not found")
}
if err != nil {
return nil, status.Error(codes.Internal, "database error")
}
return user, nil
}常见状态码:
- : 成功
OK - : 操作已取消
Canceled - : 未知错误
Unknown - : 客户端指定了无效参数
InvalidArgument - : 操作在截止时间前未完成
DeadlineExceeded - : 实体未找到
NotFound - : 实体已存在
AlreadyExists - : 权限被拒绝
PermissionDenied - : 资源耗尽(限流)
ResourceExhausted - : 操作被拒绝(系统状态无效)
FailedPrecondition - : 操作已中止
Aborted - : 超出有效范围
OutOfRange - : 操作未实现
Unimplemented - : 内部服务器错误
Internal - : 服务不可用
Unavailable - : 不可恢复的数据丢失
DataLoss - : 请求缺少有效认证信息
Unauthenticated
Rich Error Details
富错误详情
Add structured error details:
go
import "google.golang.org/genproto/googleapis/rpc/errdetails"
func (s *server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
// Validate request
violations := validateCreateUserRequest(req)
if len(violations) > 0 {
badRequest := &errdetails.BadRequest{}
for field, msg := range violations {
badRequest.FieldViolations = append(
badRequest.FieldViolations,
&errdetails.BadRequest_FieldViolation{
Field: field,
Description: msg,
},
)
}
st := status.New(codes.InvalidArgument, "invalid request")
st, _ = st.WithDetails(badRequest)
return nil, st.Err()
}
// Create user...
}Client-side error handling:
go
resp, err := client.CreateUser(ctx, req)
if err != nil {
st := status.Convert(err)
for _, detail := range st.Details() {
switch t := detail.(type) {
case *errdetails.BadRequest:
for _, violation := range t.FieldViolations {
fmt.Printf("Invalid field %s: %s\n",
violation.Field, violation.Description)
}
}
}
}添加结构化错误详情:
go
import "google.golang.org/genproto/googleapis/rpc/errdetails"
func (s *server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
// 验证请求
violations := validateCreateUserRequest(req)
if len(violations) > 0 {
badRequest := &errdetails.BadRequest{}
for field, msg := range violations {
badRequest.FieldViolations = append(
badRequest.FieldViolations,
&errdetails.BadRequest_FieldViolation{
Field: field,
Description: msg,
},
)
}
st := status.New(codes.InvalidArgument, "invalid request")
st, _ = st.WithDetails(badRequest)
return nil, st.Err()
}
// 创建用户...
}客户端错误处理:
go
resp, err := client.CreateUser(ctx, req)
if err != nil {
st := status.Convert(err)
for _, detail := range st.Details() {
switch t := detail.(type) {
case *errdetails.BadRequest:
for _, violation := range t.FieldViolations {
fmt.Printf("Invalid field %s: %s\n",
violation.Field, violation.Description)
}
}
}
}Error Propagation
错误传播
go
func (s *server) ProcessOrder(ctx context.Context, req *pb.OrderRequest) (*pb.OrderResponse, error) {
// Call inventory service
inventory, err := s.inventoryClient.CheckInventory(ctx, &pb.InventoryRequest{
ProductId: req.ProductId,
})
if err != nil {
// Propagate error with additional context
st := status.Convert(err)
return nil, status.Errorf(st.Code(),
"inventory check failed: %v", st.Message())
}
// Continue processing...
}go
func (s *server) ProcessOrder(ctx context.Context, req *pb.OrderRequest) (*pb.OrderResponse, error) {
// 调用库存服务
inventory, err := s.inventoryClient.CheckInventory(ctx, &pb.InventoryRequest{
ProductId: req.ProductId,
})
if err != nil {
// 携带额外上下文传播错误
st := status.Convert(err)
return nil, status.Errorf(st.Code(),
"inventory check failed: %v", st.Message())
}
// 继续处理...
}Retry Logic
重试逻辑
go
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"
func CallWithRetry(ctx context.Context, maxRetries int, fn func() error) error {
var err error
for i := 0; i < maxRetries; i++ {
err = fn()
if err == nil {
return nil
}
// Check if error is retryable
st := status.Convert(err)
if !isRetryable(st.Code()) {
return err
}
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
time.Sleep(backoff)
}
return err
}
func isRetryable(code codes.Code) bool {
return code == codes.Unavailable ||
code == codes.DeadlineExceeded ||
code == codes.ResourceExhausted
}go
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"
func CallWithRetry(ctx context.Context, maxRetries int, fn func() error) error {
var err error
for i := 0; i < maxRetries; i++ {
err = fn()
if err == nil {
return nil
}
// 检查错误是否可重试
st := status.Convert(err)
if !isRetryable(st.Code()) {
return err
}
// 指数退避
backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
time.Sleep(backoff)
}
return err
}
func isRetryable(code codes.Code) bool {
return code == codes.Unavailable ||
code == codes.DeadlineExceeded ||
code == codes.ResourceExhausted
}Best Practices
最佳实践
1. Schema Evolution
1. Schema演进
DO:
- Always use
syntax = "proto3" - Never reuse field numbers
- Use for deprecated fields
reserved - Add new fields with new numbers
- Use optional wrappers for nullable fields
protobuf
message User {
int32 id = 1;
string name = 2;
// string age = 3; // Deprecated
reserved 3;
reserved "age";
string email = 4;
google.protobuf.Int32Value phone = 5; // Optional
}DON'T:
- Change field types
- Reuse field numbers
- Remove fields without reserving numbers
- Change message names without aliases
建议:
- 始终使用
syntax = "proto3" - 绝不复用字段编号
- 对废弃字段使用
reserved - 使用新编号添加新字段
- 对可空字段使用可选包装类型
protobuf
message User {
int32 id = 1;
string name = 2;
// string age = 3; // 已废弃
reserved 3;
reserved "age";
string email = 4;
google.protobuf.Int32Value phone = 5; // 可选字段
}禁止:
- 更改字段类型
- 复用字段编号
- 不保留编号就删除字段
- 不使用别名就更改消息名称
2. Performance Optimization
2. 性能优化
Connection Management:
go
// Reuse connections
var conn *grpc.ClientConn
var once sync.Once
func getConnection() *grpc.ClientConn {
once.Do(func() {
var err error
conn, err = grpc.Dial(
address,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)
if err != nil {
log.Fatal(err)
}
})
return conn
}Connection Pooling:
go
type ConnectionPool struct {
connections []*grpc.ClientConn
next uint32
}
func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
n := atomic.AddUint32(&p.next, 1)
return p.connections[n%uint32(len(p.connections))]
}Streaming for Large Data:
protobuf
// Instead of this:
rpc GetAllUsers(Empty) returns (UserList); // Large response
// Use this:
rpc ListUsers(ListUsersRequest) returns (stream User); // Streamed连接管理:
go
// 复用连接
var conn *grpc.ClientConn
var once sync.Once
func getConnection() *grpc.ClientConn {
once.Do(func() {
var err error
conn, err = grpc.Dial(
address,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)
if err != nil {
log.Fatal(err)
}
})
return conn
}连接池:
go
type ConnectionPool struct {
connections []*grpc.ClientConn
next uint32
}
func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
n := atomic.AddUint32(&p.next, 1)
return p.connections[n%uint32(len(p.connections))]
}大数据流处理:
protobuf
// 不推荐:
rpc GetAllUsers(Empty) returns (UserList); // 大型响应
// 推荐:
rpc ListUsers(ListUsersRequest) returns (stream User); // 流式传输3. Security Best Practices
3. 安全最佳实践
TLS Configuration:
go
// Server-side
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
server := grpc.NewServer(grpc.Creds(creds))
// Client-side
creds, err := credentials.NewClientTLSFromFile(certFile, "")
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))Mutual TLS (mTLS):
go
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caFile)
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})
server := grpc.NewServer(grpc.Creds(creds))Token Authentication:
go
type tokenAuth struct {
token string
}
func (t tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + t.token,
}, nil
}
func (t tokenAuth) RequireTransportSecurity() bool {
return true
}
// Usage
conn, err := grpc.Dial(
address,
grpc.WithPerRPCCredentials(tokenAuth{token: "my-token"}),
)TLS配置:
go
// 服务器端
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
server := grpc.NewServer(grpc.Creds(creds))
// 客户端
creds, err := credentials.NewClientTLSFromFile(certFile, "")
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))双向TLS(mTLS):
go
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caFile)
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})
server := grpc.NewServer(grpc.Creds(creds))令牌认证:
go
type tokenAuth struct {
token string
}
func (t tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + t.token,
}, nil
}
func (t tokenAuth) RequireTransportSecurity() bool {
return true
}
// 使用方式
conn, err := grpc.Dial(
address,
grpc.WithPerRPCCredentials(tokenAuth{token: "my-token"}),
)4. Timeout and Deadline Management
4. 超时与截止时间管理
go
// Set deadline for request
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 123})
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
log.Println("Request timed out")
}
}Server-side deadline propagation:
go
func (s *server) ComplexOperation(ctx context.Context, req *pb.Request) (*pb.Response, error) {
// Check if deadline is already exceeded
deadline, ok := ctx.Deadline()
if ok && time.Now().After(deadline) {
return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
// Propagate context to downstream calls
user, err := s.userClient.GetUser(ctx, &pb.GetUserRequest{Id: req.UserId})
if err != nil {
return nil, err
}
// Continue with remaining time...
}go
// 为请求设置截止时间
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 123})
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
log.Println("Request timed out")
}
}服务器端截止时间传播:
go
func (s *server) ComplexOperation(ctx context.Context, req *pb.Request) (*pb.Response, error) {
// 检查截止时间是否已过期
deadline, ok := ctx.Deadline()
if ok && time.Now().After(deadline) {
return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
// 向下游调用传播上下文
user, err := s.userClient.GetUser(ctx, &pb.GetUserRequest{Id: req.UserId})
if err != nil {
return nil, err
}
// 利用剩余时间继续处理...
}5. Monitoring and Observability
5. 监控与可观测性
Prometheus Metrics:
go
import "github.com/grpc-ecosystem/go-grpc-prometheus"
// Server metrics
grpcMetrics := grpc_prometheus.NewServerMetrics()
server := grpc.NewServer(
grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
grpc.StreamInterceptor(grpcMetrics.StreamServerInterceptor()),
)
grpcMetrics.InitializeMetrics(server)
// Expose metrics
http.Handle("/metrics", promhttp.Handler())Prometheus指标:
go
import "github.com/grpc-ecosystem/go-grpc-prometheus"
// 服务器指标
grpcMetrics := grpc_prometheus.NewServerMetrics()
server := grpc.NewServer(
grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
grpc.StreamInterceptor(grpcMetrics.StreamServerInterceptor()),
)
grpcMetrics.InitializeMetrics(server)
// 暴露指标
http.Handle("/metrics", promhttp.Handler())6. Graceful Shutdown
6. 优雅关闭
go
server := grpc.NewServer()
// Register services...
go func() {
if err := server.Serve(listener); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
// Wait for interrupt signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// Graceful shutdown
server.GracefulStop()
log.Println("Server stopped")go
server := grpc.NewServer()
// 注册服务...
go func() {
if err := server.Serve(listener); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
// 等待中断信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// 优雅关闭
server.GracefulStop()
log.Println("Server stopped")7. Service Versioning
7. 服务版本化
URL-based versioning:
protobuf
package api.v1;
service UserServiceV1 {
rpc GetUser(GetUserRequest) returns (User);
}
package api.v2;
service UserServiceV2 {
rpc GetUser(GetUserRequest) returns (User);
}Field-based versioning:
protobuf
message User {
int32 id = 1;
string name = 2;
string email = 3;
// v2 additions
string phone = 4;
Address address = 5;
}基于URL的版本化:
protobuf
package api.v1;
service UserServiceV1 {
rpc GetUser(GetUserRequest) returns (User);
}
package api.v2;
service UserServiceV2 {
rpc GetUser(GetUserRequest) returns (User);
}基于字段的版本化:
protobuf
message User {
int32 id = 1;
string name = 2;
string email = 3;
// v2新增字段
string phone = 4;
Address address = 5;
}8. Testing Best Practices
8. 测试最佳实践
Unit Testing with Mocks:
go
type mockUserClient struct {
pb.UserServiceClient
getUserFunc func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error)
}
func (m *mockUserClient) GetUser(ctx context.Context, req *pb.GetUserRequest, opts ...grpc.CallOption) (*pb.User, error) {
return m.getUserFunc(ctx, req)
}
func TestOrderService(t *testing.T) {
mockClient := &mockUserClient{
getUserFunc: func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
return &pb.User{Id: 1, Name: "Test User"}, nil
},
}
// Test with mock...
}Integration Testing:
go
func TestIntegration(t *testing.T) {
// Start test server
lis, err := net.Listen("tcp", ":0")
require.NoError(t, err)
server := grpc.NewServer()
pb.RegisterUserServiceServer(server, &userServer{})
go server.Serve(lis)
defer server.Stop()
// Connect client
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
require.NoError(t, err)
defer conn.Close()
client := pb.NewUserServiceClient(conn)
// Test requests...
}使用Mock进行单元测试:
go
type mockUserClient struct {
pb.UserServiceClient
getUserFunc func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error)
}
func (m *mockUserClient) GetUser(ctx context.Context, req *pb.GetUserRequest, opts ...grpc.CallOption) (*pb.User, error) {
return m.getUserFunc(ctx, req)
}
func TestOrderService(t *testing.T) {
mockClient := &mockUserClient{
getUserFunc: func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
return &pb.User{Id: 1, Name: "Test User"}, nil
},
}
// 使用Mock进行测试...
}集成测试:
go
func TestIntegration(t *testing.T) {
// 启动测试服务器
lis, err := net.Listen("tcp", ":0")
require.NoError(t, err)
server := grpc.NewServer()
pb.RegisterUserServiceServer(server, &userServer{})
go server.Serve(lis)
defer server.Stop()
// 连接客户端
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
require.NoError(t, err)
defer conn.Close()
client := pb.NewUserServiceClient(conn)
// 测试请求...
}Production Deployment Patterns
生产部署模式
Docker Deployment
Docker部署
Dockerfile:
dockerfile
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server ./cmd/server
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY /app/server .
COPY /app/proto ./proto
EXPOSE 50051
CMD ["./server"]Dockerfile:
dockerfile
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server ./cmd/server
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY /app/server .
COPY /app/proto ./proto
EXPOSE 50051
CMD ["./server"]Kubernetes Deployment
Kubernetes部署
deployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: grpc-service
spec:
replicas: 3
selector:
matchLabels:
app: grpc-service
template:
metadata:
labels:
app: grpc-service
spec:
containers:
- name: grpc-service
image: grpc-service:latest
ports:
- containerPort: 50051
name: grpc
protocol: TCP
env:
- name: PORT
value: "50051"
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:50051"]
initialDelaySeconds: 10
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:50051"]
initialDelaySeconds: 5
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: grpc-service
spec:
selector:
app: grpc-service
ports:
- port: 50051
targetPort: 50051
protocol: TCP
name: grpc
type: ClusterIPdeployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: grpc-service
spec:
replicas: 3
selector:
matchLabels:
app: grpc-service
template:
metadata:
labels:
app: grpc-service
spec:
containers:
- name: grpc-service
image: grpc-service:latest
ports:
- containerPort: 50051
name: grpc
protocol: TCP
env:
- name: PORT
value: "50051"
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:50051"]
initialDelaySeconds: 10
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:50051"]
initialDelaySeconds: 5
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: grpc-service
spec:
selector:
app: grpc-service
ports:
- port: 50051
targetPort: 50051
protocol: TCP
name: grpc
type: ClusterIPService Mesh Integration (Istio)
服务网格集成(Istio)
VirtualService for traffic routing:
yaml
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: grpc-service
spec:
hosts:
- grpc-service
http:
- match:
- headers:
version:
exact: v2
route:
- destination:
host: grpc-service
subset: v2
- route:
- destination:
host: grpc-service
subset: v1流量路由VirtualService:
yaml
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: grpc-service
spec:
hosts:
- grpc-service
http:
- match:
- headers:
version:
exact: v2
route:
- destination:
host: grpc-service
subset: v2
- route:
- destination:
host: grpc-service
subset: v1Common Patterns and Anti-Patterns
常见模式与反模式
✅ DO:
✅ 建议:
- Use streaming for large datasets
- Implement proper error handling with status codes
- Add interceptors for cross-cutting concerns
- Use connection pooling for high-throughput clients
- Implement health checks
- Set appropriate timeouts and deadlines
- Use TLS in production
- Version your APIs
- Monitor with metrics and tracing
- Test with integration tests
- 对大型数据集使用流传输
- 使用状态码实现正确的错误处理
- 为横切关注点添加拦截器
- 对高吞吐量客户端使用连接池
- 实现健康检查
- 设置合适的超时与截止时间
- 在生产环境中使用TLS
- 对API进行版本化
- 使用指标与追踪进行监控
- 通过集成测试验证功能
❌ DON'T:
❌ 禁止:
- Don't use unary RPCs for large datasets - Use streaming instead
- Don't ignore context cancellation - Always check context.Done()
- Don't create new connections per request - Reuse connections
- Don't skip authentication/authorization - Always validate
- Don't forget graceful shutdown - Handle SIGTERM properly
- Don't hardcode endpoints - Use service discovery
- Don't ignore errors - Handle all error cases
- Don't use blocking operations without timeouts - Always set deadlines
- Don't skip health checks - Implement liveness/readiness probes
- Don't deploy without monitoring - Add metrics and logging
Skill Version: 1.0.0
Last Updated: October 2025
Skill Category: Microservices, gRPC, Distributed Systems, API Design
Compatible With: Go, Python, Node.js, Java, C++, C#, Ruby, and more
- 不要对大型数据集使用一元RPC - 改用流传输
- 不要忽略上下文取消 - 始终检查context.Done()
- 不要为每个请求创建新连接 - 复用连接
- 不要跳过认证/授权 - 始终进行验证
- 不要忘记优雅关闭 - 正确处理SIGTERM信号
- 不要硬编码端点 - 使用服务发现
- 不要忽略错误 - 处理所有错误场景
- 不要在无超时的情况下使用阻塞操作 - 始终设置截止时间
- 不要跳过健康检查 - 实现存活/就绪探针
- 不要在无监控的情况下部署 - 添加指标与日志
技能版本: 1.0.0
最后更新: 2025年10月
技能分类: 微服务, gRPC, 分布式系统, API设计
兼容语言: Go, Python, Node.js, Java, C++, C#, Ruby等