grpc-microservices

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

gRPC Microservices

gRPC 微服务

A comprehensive skill for building high-performance, type-safe microservices using gRPC and Protocol Buffers. This skill covers service design, all streaming patterns, interceptors, load balancing, error handling, and production deployment patterns for distributed systems.
这是一项使用gRPC和Protocol Buffers构建高性能、类型安全微服务的全面技能。本技能涵盖分布式系统的服务设计、所有流模式、拦截器、负载均衡、错误处理以及生产部署模式。

When to Use This Skill

何时使用本技能

Use this skill when:
  • Building microservices that require high-performance, low-latency communication
  • Implementing real-time data streaming between services
  • Designing type-safe APIs with strong contracts using Protocol Buffers
  • Creating polyglot systems where services are written in different languages
  • Building distributed systems requiring bidirectional streaming
  • Implementing service meshes with advanced routing and observability
  • Designing APIs that need to evolve with backward/forward compatibility
  • Creating internal APIs where performance and type safety are critical
  • Building event-driven architectures with streaming data pipelines
  • Implementing client-server systems with push capabilities (server streaming)
  • Designing systems requiring efficient binary serialization
  • Building microservices requiring automatic code generation for multiple languages
在以下场景中使用本技能:
  • 构建需要高性能、低延迟通信的微服务
  • 实现服务间的实时数据流
  • 使用Protocol Buffers设计具有强契约的类型安全API
  • 创建由不同语言编写服务的多语言系统
  • 构建需要双向流的分布式系统
  • 实现具备高级路由和可观测性的服务网格
  • 设计需要向后/向前兼容演进的API
  • 构建对性能和类型安全要求严格的内部API
  • 构建带有流数据管道的事件驱动架构
  • 实现具备推送能力的客户端-服务器系统(服务器流)
  • 设计需要高效二进制序列化的系统
  • 构建需要为多语言自动生成代码的微服务

Core Concepts

核心概念

gRPC Fundamentals

gRPC 基础

gRPC is a modern open-source RPC framework that can run anywhere. It enables client and server applications to communicate transparently and makes it easier to build connected systems.
Key Characteristics:
  • HTTP/2 based: Multiplexing, server push, header compression
  • Protocol Buffers: Efficient binary serialization format
  • Streaming: Bidirectional streaming support built-in
  • Code Generation: Auto-generate client/server code in 10+ languages
  • Deadlines/Timeouts: First-class timeout support
  • Cancellation: Propagate cancellation across services
  • Interceptors: Middleware pattern for cross-cutting concerns
gRPC是一个现代的开源RPC框架,可在任何环境中运行。它使客户端和服务器应用能够透明地通信,简化互联系统的构建。
关键特性:
  • 基于HTTP/2:多路复用、服务器推送、头部压缩
  • Protocol Buffers:高效的二进制序列化格式
  • 流支持:内置双向流功能
  • 代码生成:自动为10+种语言生成客户端/服务器代码
  • 截止时间/超时:原生支持超时设置
  • 取消机制:跨服务传播取消信号
  • 拦截器:用于横切关注点的中间件模式

Protocol Buffers (protobuf)

Protocol Buffers(protobuf)

Protocol Buffers is a language-neutral, platform-neutral extensible mechanism for serializing structured data.
Advantages:
  • Compact: 3-10x smaller than JSON
  • Fast: 20-100x faster to serialize/deserialize than JSON
  • Type-safe: Strongly typed schema with validation
  • Backward/Forward Compatible: Evolve schemas safely
  • Language Support: Official support for 10+ languages
  • Self-documenting: Schema serves as documentation
Basic Syntax:
protobuf
syntax = "proto3";

message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
}
Protocol Buffers是一种与语言、平台无关的可扩展结构化数据序列化机制。
优势:
  • 紧凑:比JSON小3-10倍
  • 快速:序列化/反序列化速度比JSON快20-100倍
  • 类型安全:带有验证的强类型schema
  • 向后/向前兼容:安全演进schema
  • 多语言支持:官方支持10+种语言
  • 自文档化:schema可作为文档使用
基础语法:
protobuf
syntax = "proto3";

message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
}

Service Definitions

服务定义

gRPC services are defined in
.proto
files and specify available methods and their input/output types.
Basic Service:
protobuf
service UserService {
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
}
gRPC服务在
.proto
文件中定义,指定可用方法及其输入/输出类型。
基础服务:
protobuf
service UserService {
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
}

Four Types of RPC Methods

四种RPC方法类型

1. Unary RPC (Request-Response)

1. 一元RPC(请求-响应)

Simple request-response pattern, like a traditional REST API call.
protobuf
rpc GetUser(GetUserRequest) returns (GetUserResponse);
Use Cases:
  • CRUD operations
  • Simple queries
  • Synchronous operations
  • Traditional request-response patterns
简单的请求-响应模式,类似传统REST API调用。
protobuf
rpc GetUser(GetUserRequest) returns (GetUserResponse);
适用场景:
  • CRUD操作
  • 简单查询
  • 同步操作
  • 传统请求-响应模式

2. Server Streaming RPC

2. 服务器流RPC

Client sends one request, server returns a stream of responses.
protobuf
rpc ListUsers(ListUsersRequest) returns (stream User);
Use Cases:
  • Paginated results
  • Real-time updates
  • Server-side event push
  • Large dataset downloads
客户端发送一个请求,服务器返回响应流。
protobuf
rpc ListUsers(ListUsersRequest) returns (stream User);
适用场景:
  • 分页结果
  • 实时更新
  • 服务器端事件推送
  • 大型数据集下载

3. Client Streaming RPC

3. 客户端流RPC

Client sends a stream of requests, server returns one response.
protobuf
rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);
Use Cases:
  • Bulk uploads
  • Batch processing
  • Client-side aggregation
  • File uploads in chunks
客户端发送请求流,服务器返回一个响应。
protobuf
rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);
适用场景:
  • 批量上传
  • 批处理
  • 客户端聚合
  • 分块文件上传

4. Bidirectional Streaming RPC

4. 双向流RPC

Both client and server send streams of messages independently.
protobuf
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
Use Cases:
  • Real-time chat applications
  • Live collaboration
  • Gaming (real-time state sync)
  • IoT bidirectional communication
客户端和服务器独立发送消息流。
protobuf
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
适用场景:
  • 实时聊天应用
  • 在线协作
  • 游戏(实时状态同步)
  • IoT双向通信

Protobuf Schema Design

Protobuf Schema设计

Message Design Best Practices

消息设计最佳实践

1. Use Explicit Field Numbers
Field numbers are critical for backward compatibility and should never be reused.
protobuf
message User {
  int32 id = 1;           // Never change this number
  string name = 2;        // Never change this number
  string email = 3;       // Never change this number
  // int32 age = 4;       // DEPRECATED - don't reuse 4
  string phone = 5;       // New field - use next available
}
2. Use Enumerations for Fixed Sets
protobuf
enum UserRole {
  USER_ROLE_UNSPECIFIED = 0;  // Always have a zero value
  USER_ROLE_ADMIN = 1;
  USER_ROLE_MODERATOR = 2;
  USER_ROLE_MEMBER = 3;
}

message User {
  int32 id = 1;
  string name = 2;
  UserRole role = 3;
}
3. Use Nested Messages for Complex Types
protobuf
message User {
  int32 id = 1;
  string name = 2;

  message Address {
    string street = 1;
    string city = 2;
    string state = 3;
    string zip = 4;
  }

  Address address = 3;
  repeated Address additional_addresses = 4;
}
4. Use
repeated
for Arrays
protobuf
message UserList {
  repeated User users = 1;
}

message User {
  int32 id = 1;
  string name = 2;
  repeated string tags = 3;
}
5. Use
oneof
for Union Types
protobuf
message SearchRequest {
  string query = 1;

  oneof filter {
    string category = 2;
    int32 user_id = 3;
    string tag = 4;
  }
}
6. Use
google.protobuf
Well-Known Types
protobuf
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";

message Event {
  string id = 1;
  string name = 2;
  google.protobuf.Timestamp created_at = 3;
  google.protobuf.Duration duration = 4;
  google.protobuf.Int32Value optional_count = 5;
}
1. 使用明确的字段编号
字段编号对向后兼容性至关重要,绝不能重复使用。
protobuf
message User {
  int32 id = 1;           // 永远不要更改此编号
  string name = 2;        // 永远不要更改此编号
  string email = 3;       // 永远不要更改此编号
  // int32 age = 4;       // 已废弃 - 不要复用编号4
  string phone = 5;       // 新字段 - 使用下一个可用编号
}
2. 对固定集合使用枚举
protobuf
enum UserRole {
  USER_ROLE_UNSPECIFIED = 0;  // 始终保留零值
  USER_ROLE_ADMIN = 1;
  USER_ROLE_MODERATOR = 2;
  USER_ROLE_MEMBER = 3;
}

message User {
  int32 id = 1;
  string name = 2;
  UserRole role = 3;
}
3. 对复杂类型使用嵌套消息
protobuf
message User {
  int32 id = 1;
  string name = 2;

  message Address {
    string street = 1;
    string city = 2;
    string state = 3;
    string zip = 4;
  }

  Address address = 3;
  repeated Address additional_addresses = 4;
}
4. 使用
repeated
表示数组
protobuf
message UserList {
  repeated User users = 1;
}

message User {
  int32 id = 1;
  string name = 2;
  repeated string tags = 3;
}
5. 使用
oneof
表示联合类型
protobuf
message SearchRequest {
  string query = 1;

  oneof filter {
    string category = 2;
    int32 user_id = 3;
    string tag = 4;
  }
}
6. 使用
google.protobuf
知名类型
protobuf
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";

message Event {
  string id = 1;
  string name = 2;
  google.protobuf.Timestamp created_at = 3;
  google.protobuf.Duration duration = 4;
  google.protobuf.Int32Value optional_count = 5;
}

Service Design Patterns

服务设计模式

1. Resource-Oriented Design
Follow RESTful principles adapted for RPC:
protobuf
service UserService {
  // Get single resource
  rpc GetUser(GetUserRequest) returns (User);

  // List resources
  rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);

  // Create resource
  rpc CreateUser(CreateUserRequest) returns (User);

  // Update resource
  rpc UpdateUser(UpdateUserRequest) returns (User);

  // Delete resource
  rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
}
2. Pagination Pattern
protobuf
message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
  string filter = 3;
}

message ListUsersResponse {
  repeated User users = 1;
  string next_page_token = 2;
  int32 total_count = 3;
}
3. Batch Operations Pattern
protobuf
message BatchGetUsersRequest {
  repeated int32 user_ids = 1;
}

message BatchGetUsersResponse {
  map<int32, User> users = 1;
  repeated int32 not_found = 2;
}
4. Long-Running Operations Pattern
protobuf
import "google/longrunning/operations.proto";

service BatchJobService {
  rpc ProcessBatch(BatchRequest) returns (google.longrunning.Operation);
  rpc GetOperation(GetOperationRequest) returns (google.longrunning.Operation);
}
1. 面向资源的设计
适配RPC的RESTful原则:
protobuf
service UserService {
  // 获取单个资源
  rpc GetUser(GetUserRequest) returns (User);

  // 列出资源
  rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);

  // 创建资源
  rpc CreateUser(CreateUserRequest) returns (User);

  // 更新资源
  rpc UpdateUser(UpdateUserRequest) returns (User);

  // 删除资源
  rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
}
2. 分页模式
protobuf
message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
  string filter = 3;
}

message ListUsersResponse {
  repeated User users = 1;
  string next_page_token = 2;
  int32 total_count = 3;
}
3. 批量操作模式
protobuf
message BatchGetUsersRequest {
  repeated int32 user_ids = 1;
}

message BatchGetUsersResponse {
  map<int32, User> users = 1;
  repeated int32 not_found = 2;
}
4. 长时运行操作模式
protobuf
import "google/longrunning/operations.proto";

service BatchJobService {
  rpc ProcessBatch(BatchRequest) returns (google.longrunning.Operation);
  rpc GetOperation(GetOperationRequest) returns (google.longrunning.Operation);
}

Streaming Patterns

流模式

Server Streaming Patterns

服务器流模式

1. Pagination Streaming
Stream large result sets efficiently:
protobuf
service ProductService {
  rpc SearchProducts(SearchRequest) returns (stream Product);
}

message SearchRequest {
  string query = 1;
  int32 limit = 2;
}
Implementation (Go):
go
func (s *server) SearchProducts(req *pb.SearchRequest, stream pb.ProductService_SearchProductsServer) error {
    products := s.db.Search(req.Query, req.Limit)

    for _, product := range products {
        if err := stream.Send(&product); err != nil {
            return err
        }
    }

    return nil
}
2. Real-Time Updates
Push updates to clients as they occur:
protobuf
service EventService {
  rpc SubscribeToEvents(SubscribeRequest) returns (stream Event);
}

message SubscribeRequest {
  repeated string event_types = 1;
  google.protobuf.Timestamp since = 2;
}
3. Log Tailing
Stream logs or audit trails:
protobuf
service LogService {
  rpc TailLogs(TailRequest) returns (stream LogEntry);
}

message TailRequest {
  string service_name = 1;
  string level = 2;
  int32 lines = 3;
}
1. 分页流
高效流式传输大型结果集:
protobuf
service ProductService {
  rpc SearchProducts(SearchRequest) returns (stream Product);
}

message SearchRequest {
  string query = 1;
  int32 limit = 2;
}
Go实现:
go
func (s *server) SearchProducts(req *pb.SearchRequest, stream pb.ProductService_SearchProductsServer) error {
    products := s.db.Search(req.Query, req.Limit)

    for _, product := range products {
        if err := stream.Send(&product); err != nil {
            return err
        }
    }

    return nil
}
2. 实时更新
事件发生时向客户端推送更新:
protobuf
service EventService {
  rpc SubscribeToEvents(SubscribeRequest) returns (stream Event);
}

message SubscribeRequest {
  repeated string event_types = 1;
  google.protobuf.Timestamp since = 2;
}
3. 日志追踪
流式传输日志或审计轨迹:
protobuf
service LogService {
  rpc TailLogs(TailRequest) returns (stream LogEntry);
}

message TailRequest {
  string service_name = 1;
  string level = 2;
  int32 lines = 3;
}

Client Streaming Patterns

客户端流模式

1. Bulk Upload
Client streams data, server processes and returns summary:
protobuf
service UploadService {
  rpc UploadImages(stream ImageChunk) returns (UploadSummary);
}

message ImageChunk {
  string filename = 1;
  bytes data = 2;
  int32 chunk_number = 3;
}

message UploadSummary {
  int32 total_images = 1;
  int64 total_bytes = 2;
  repeated string uploaded_filenames = 3;
}
Implementation (Go):
go
func (s *server) UploadImages(stream pb.UploadService_UploadImagesServer) error {
    var count int32
    var totalBytes int64
    var filenames []string

    for {
        chunk, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.UploadSummary{
                TotalImages: count,
                TotalBytes:  totalBytes,
                UploadedFilenames: filenames,
            })
        }
        if err != nil {
            return err
        }

        // Process chunk
        totalBytes += int64(len(chunk.Data))
        if chunk.ChunkNumber == 0 {
            count++
            filenames = append(filenames, chunk.Filename)
        }
    }
}
2. Aggregation
Client sends multiple data points, server aggregates:
protobuf
service AnalyticsService {
  rpc RecordMetrics(stream Metric) returns (AggregateResult);
}

message Metric {
  string name = 1;
  double value = 2;
  google.protobuf.Timestamp timestamp = 3;
}
1. 批量上传
客户端流式传输数据,服务器处理并返回摘要:
protobuf
service UploadService {
  rpc UploadImages(stream ImageChunk) returns (UploadSummary);
}

message ImageChunk {
  string filename = 1;
  bytes data = 2;
  int32 chunk_number = 3;
}

message UploadSummary {
  int32 total_images = 1;
  int64 total_bytes = 2;
  repeated string uploaded_filenames = 3;
}
Go实现:
go
func (s *server) UploadImages(stream pb.UploadService_UploadImagesServer) error {
    var count int32
    var totalBytes int64
    var filenames []string

    for {
        chunk, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.UploadSummary{
                TotalImages: count,
                TotalBytes:  totalBytes,
                UploadedFilenames: filenames,
            })
        }
        if err != nil {
            return err
        }

        // 处理数据块
        totalBytes += int64(len(chunk.Data))
        if chunk.ChunkNumber == 0 {
            count++
            filenames = append(filenames, chunk.Filename)
        }
    }
}
2. 聚合操作
客户端发送多个数据点,服务器进行聚合:
protobuf
service AnalyticsService {
  rpc RecordMetrics(stream Metric) returns (AggregateResult);
}

message Metric {
  string name = 1;
  double value = 2;
  google.protobuf.Timestamp timestamp = 3;
}

Bidirectional Streaming Patterns

双向流模式

1. Chat Application
Real-time bidirectional communication:
protobuf
service ChatService {
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
  string user_id = 1;
  string room_id = 2;
  string content = 3;
  google.protobuf.Timestamp timestamp = 4;
}
Implementation (Go):
go
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
    // Create channel for this client
    clientID := uuid.New().String()
    msgChan := make(chan *pb.ChatMessage, 10)

    // Register client
    s.mu.Lock()
    s.clients[clientID] = msgChan
    s.mu.Unlock()

    defer func() {
        s.mu.Lock()
        delete(s.clients, clientID)
        close(msgChan)
        s.mu.Unlock()
    }()

    // Goroutine to send messages to client
    go func() {
        for msg := range msgChan {
            if err := stream.Send(msg); err != nil {
                return
            }
        }
    }()

    // Receive messages from client
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        // Broadcast to all clients in room
        s.broadcast(msg)
    }
}
2. Live Collaboration
Real-time document editing:
protobuf
service CollaborationService {
  rpc Collaborate(stream DocumentEdit) returns (stream DocumentEdit);
}

message DocumentEdit {
  string document_id = 1;
  string user_id = 2;
  int32 position = 3;
  string content = 4;
  enum Operation {
    OPERATION_UNSPECIFIED = 0;
    OPERATION_INSERT = 1;
    OPERATION_DELETE = 2;
    OPERATION_UPDATE = 3;
  }
  Operation operation = 5;
}
3. Game State Synchronization
Real-time multiplayer game updates:
protobuf
service GameService {
  rpc PlayGame(stream GameAction) returns (stream GameState);
}

message GameAction {
  string player_id = 1;
  string game_id = 2;
  string action_type = 3;
  bytes action_data = 4;
}

message GameState {
  string game_id = 1;
  repeated PlayerState players = 2;
  bytes world_state = 3;
  google.protobuf.Timestamp timestamp = 4;
}
1. 聊天应用
实时双向通信:
protobuf
service ChatService {
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
  string user_id = 1;
  string room_id = 2;
  string content = 3;
  google.protobuf.Timestamp timestamp = 4;
}
Go实现:
go
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
    // 为此客户端创建通道
    clientID := uuid.New().String()
    msgChan := make(chan *pb.ChatMessage, 10)

    // 注册客户端
    s.mu.Lock()
    s.clients[clientID] = msgChan
    s.mu.Unlock()

    defer func() {
        s.mu.Lock()
        delete(s.clients, clientID)
        close(msgChan)
        s.mu.Unlock()
    }()

    // 向客户端发送消息的协程
    go func() {
        for msg := range msgChan {
            if err := stream.Send(msg); err != nil {
                return
            }
        }
    }()

    // 接收客户端消息
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        // 向房间内所有客户端广播
        s.broadcast(msg)
    }
}
2. 在线协作
实时文档编辑:
protobuf
service CollaborationService {
  rpc Collaborate(stream DocumentEdit) returns (stream DocumentEdit);
}

message DocumentEdit {
  string document_id = 1;
  string user_id = 2;
  int32 position = 3;
  string content = 4;
  enum Operation {
    OPERATION_UNSPECIFIED = 0;
    OPERATION_INSERT = 1;
    OPERATION_DELETE = 2;
    OPERATION_UPDATE = 3;
  }
  Operation operation = 5;
}
3. 游戏状态同步
实时多人游戏更新:
protobuf
service GameService {
  rpc PlayGame(stream GameAction) returns (stream GameState);
}

message GameAction {
  string player_id = 1;
  string game_id = 2;
  string action_type = 3;
  bytes action_data = 4;
}

message GameState {
  string game_id = 1;
  repeated PlayerState players = 2;
  bytes world_state = 3;
  google.protobuf.Timestamp timestamp = 4;
}

Interceptors (Middleware)

拦截器(中间件)

Interceptors provide a way to add cross-cutting concerns to gRPC services.
拦截器为gRPC服务提供了添加横切关注点的方式。

Unary Interceptors

一元拦截器

Server-side Unary Interceptor:
go
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        // Pre-processing
        start := time.Now()
        log.Printf("Method: %s, Start: %v", info.FullMethod, start)

        // Call the handler
        resp, err := handler(ctx, req)

        // Post-processing
        duration := time.Since(start)
        log.Printf("Method: %s, Duration: %v, Error: %v",
            info.FullMethod, duration, err)

        return resp, err
    }
}

// Usage
server := grpc.NewServer(
    grpc.UnaryInterceptor(UnaryServerInterceptor()),
)
Client-side Unary Interceptor:
go
func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
    return func(
        ctx context.Context,
        method string,
        req, reply interface{},
        cc *grpc.ClientConn,
        invoker grpc.UnaryInvoker,
        opts ...grpc.CallOption,
    ) error {
        start := time.Now()

        // Call the remote method
        err := invoker(ctx, method, req, reply, cc, opts...)

        log.Printf("Method: %s, Duration: %v, Error: %v",
            method, time.Since(start), err)

        return err
    }
}

// Usage
conn, err := grpc.Dial(
    address,
    grpc.WithUnaryInterceptor(UnaryClientInterceptor()),
)
服务器端一元拦截器:
go
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        // 预处理
        start := time.Now()
        log.Printf("Method: %s, Start: %v", info.FullMethod, start)

        // 调用处理函数
        resp, err := handler(ctx, req)

        // 后处理
        duration := time.Since(start)
        log.Printf("Method: %s, Duration: %v, Error: %v",
            info.FullMethod, duration, err)

        return resp, err
    }
}

// 使用方式
server := grpc.NewServer(
    grpc.UnaryInterceptor(UnaryServerInterceptor()),
)
客户端一元拦截器:
go
func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
    return func(
        ctx context.Context,
        method string,
        req, reply interface{},
        cc *grpc.ClientConn,
        invoker grpc.UnaryInvoker,
        opts ...grpc.CallOption,
    ) error {
        start := time.Now()

        // 调用远程方法
        err := invoker(ctx, method, req, reply, cc, opts...)

        log.Printf("Method: %s, Duration: %v, Error: %v",
            method, time.Since(start), err)

        return err
    }
}

// 使用方式
conn, err := grpc.Dial(
    address,
    grpc.WithUnaryInterceptor(UnaryClientInterceptor()),
)

Streaming Interceptors

流拦截器

Server-side Stream Interceptor:
go
func StreamServerInterceptor() grpc.StreamServerInterceptor {
    return func(
        srv interface{},
        ss grpc.ServerStream,
        info *grpc.StreamServerInfo,
        handler grpc.StreamHandler,
    ) error {
        log.Printf("Stream started: %s", info.FullMethod)

        err := handler(srv, ss)

        log.Printf("Stream ended: %s, Error: %v", info.FullMethod, err)

        return err
    }
}
服务器端流拦截器:
go
func StreamServerInterceptor() grpc.StreamServerInterceptor {
    return func(
        srv interface{},
        ss grpc.ServerStream,
        info *grpc.StreamServerInfo,
        handler grpc.StreamHandler,
    ) error {
        log.Printf("Stream started: %s", info.FullMethod)

        err := handler(srv, ss)

        log.Printf("Stream ended: %s, Error: %v", info.FullMethod, err)

        return err
    }
}

Common Interceptor Patterns

常见拦截器模式

1. Authentication Interceptor

1. 认证拦截器

go
func AuthInterceptor(secret string) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        // Extract metadata
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            return nil, status.Error(codes.Unauthenticated, "no metadata")
        }

        // Get authorization token
        tokens := md["authorization"]
        if len(tokens) == 0 {
            return nil, status.Error(codes.Unauthenticated, "no token")
        }

        // Validate token
        token := tokens[0]
        claims, err := validateJWT(token, secret)
        if err != nil {
            return nil, status.Error(codes.Unauthenticated, "invalid token")
        }

        // Add claims to context
        ctx = context.WithValue(ctx, "claims", claims)

        return handler(ctx, req)
    }
}
go
func AuthInterceptor(secret string) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        // 提取元数据
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            return nil, status.Error(codes.Unauthenticated, "no metadata")
        }

        // 获取授权令牌
        tokens := md["authorization"]
        if len(tokens) == 0 {
            return nil, status.Error(codes.Unauthenticated, "no token")
        }

        // 验证令牌
        token := tokens[0]
        claims, err := validateJWT(token, secret)
        if err != nil {
            return nil, status.Error(codes.Unauthenticated, "invalid token")
        }

        // 将claims添加到上下文
        ctx = context.WithValue(ctx, "claims", claims)

        return handler(ctx, req)
    }
}

2. Logging Interceptor

2. 日志拦截器

go
func LoggingInterceptor(logger *log.Logger) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        start := time.Now()

        // Get request ID from metadata
        requestID := getRequestID(ctx)

        logger.Printf("[%s] Request: %s", requestID, info.FullMethod)

        resp, err := handler(ctx, req)

        duration := time.Since(start)
        statusCode := status.Code(err)

        logger.Printf("[%s] Response: %s, Duration: %v, Status: %v",
            requestID, info.FullMethod, duration, statusCode)

        return resp, err
    }
}
go
func LoggingInterceptor(logger *log.Logger) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        start := time.Now()

        // 从元数据中获取请求ID
        requestID := getRequestID(ctx)

        logger.Printf("[%s] Request: %s", requestID, info.FullMethod)

        resp, err := handler(ctx, req)

        duration := time.Since(start)
        statusCode := status.Code(err)

        logger.Printf("[%s] Response: %s, Duration: %v, Status: %v",
            requestID, info.FullMethod, duration, statusCode)

        return resp, err
    }
}

3. Rate Limiting Interceptor

3. 限流拦截器

go
func RateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        if !limiter.Allow() {
            return nil, status.Error(
                codes.ResourceExhausted,
                "rate limit exceeded",
            )
        }

        return handler(ctx, req)
    }
}
go
func RateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        if !limiter.Allow() {
            return nil, status.Error(
                codes.ResourceExhausted,
                "rate limit exceeded",
            )
        }

        return handler(ctx, req)
    }
}

4. Tracing Interceptor (OpenTelemetry)

4. 追踪拦截器(OpenTelemetry)

go
func TracingInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        ctx, span := tracer.Start(ctx, info.FullMethod)
        defer span.End()

        // Add attributes
        span.SetAttributes(
            attribute.String("rpc.method", info.FullMethod),
            attribute.String("rpc.service", "MyService"),
        )

        resp, err := handler(ctx, req)

        if err != nil {
            span.RecordError(err)
            span.SetStatus(codes2.Error, err.Error())
        } else {
            span.SetStatus(codes2.Ok, "")
        }

        return resp, err
    }
}
go
func TracingInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        ctx, span := tracer.Start(ctx, info.FullMethod)
        defer span.End()

        // 添加属性
        span.SetAttributes(
            attribute.String("rpc.method", info.FullMethod),
            attribute.String("rpc.service", "MyService"),
        )

        resp, err := handler(ctx, req)

        if err != nil {
            span.RecordError(err)
            span.SetStatus(codes2.Error, err.Error())
        } else {
            span.SetStatus(codes2.Ok, "")
        }

        return resp, err
    }
}

5. Error Recovery Interceptor

5. 错误恢复拦截器

go
func RecoveryInterceptor() grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (resp interface{}, err error) {
        defer func() {
            if r := recover(); r != nil {
                log.Printf("Panic recovered: %v\n%s", r, debug.Stack())
                err = status.Error(codes.Internal, "internal server error")
            }
        }()

        return handler(ctx, req)
    }
}
go
func RecoveryInterceptor() grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (resp interface{}, err error) {
        defer func() {
            if r := recover(); r != nil {
                log.Printf("Panic recovered: %v\n%s", r, debug.Stack())
                err = status.Error(codes.Internal, "internal server error")
            }
        }()

        return handler(ctx, req)
    }
}

Chaining Multiple Interceptors

链式调用多个拦截器

go
server := grpc.NewServer(
    grpc.ChainUnaryInterceptor(
        RecoveryInterceptor(),
        LoggingInterceptor(logger),
        TracingInterceptor(tracer),
        AuthInterceptor(jwtSecret),
        RateLimitInterceptor(limiter),
    ),
    grpc.ChainStreamInterceptor(
        StreamRecoveryInterceptor(),
        StreamLoggingInterceptor(logger),
    ),
)
go
server := grpc.NewServer(
    grpc.ChainUnaryInterceptor(
        RecoveryInterceptor(),
        LoggingInterceptor(logger),
        TracingInterceptor(tracer),
        AuthInterceptor(jwtSecret),
        RateLimitInterceptor(limiter),
    ),
    grpc.ChainStreamInterceptor(
        StreamRecoveryInterceptor(),
        StreamLoggingInterceptor(logger),
    ),
)

Load Balancing

负载均衡

Client-Side Load Balancing

客户端负载均衡

gRPC provides built-in client-side load balancing with multiple policies.
1. Round Robin
go
import "google.golang.org/grpc/balancer/roundrobin"

conn, err := grpc.Dial(
    "dns:///my-service.example.com:50051",
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
    grpc.WithInsecure(),
)
2. Pick First (Default)
go
conn, err := grpc.Dial(
    "dns:///my-service.example.com:50051",
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`),
    grpc.WithInsecure(),
)
3. Custom Resolver
Implement custom service discovery:
go
type exampleResolver struct {
    target     resolver.Target
    cc         resolver.ClientConn
    addrsStore map[string][]string
}

func (r *exampleResolver) ResolveNow(resolver.ResolveNowOptions) {
    // Discover service addresses
    addresses := r.discoverServices()

    var addrs []resolver.Address
    for _, addr := range addresses {
        addrs = append(addrs, resolver.Address{Addr: addr})
    }

    r.cc.UpdateState(resolver.State{Addresses: addrs})
}

func init() {
    resolver.Register(&exampleResolverBuilder{})
}
gRPC提供内置的客户端负载均衡,支持多种策略。
1. 轮询
go
import "google.golang.org/grpc/balancer/roundrobin"

conn, err := grpc.Dial(
    "dns:///my-service.example.com:50051",
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
    grpc.WithInsecure(),
)
2. 优先选择(默认)
go
conn, err := grpc.Dial(
    "dns:///my-service.example.com:50051",
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`),
    grpc.WithInsecure(),
)
3. 自定义解析器
实现自定义服务发现:
go
type exampleResolver struct {
    target     resolver.Target
    cc         resolver.ClientConn
    addrsStore map[string][]string
}

func (r *exampleResolver) ResolveNow(resolver.ResolveNowOptions) {
    // 发现服务地址
    addresses := r.discoverServices()

    var addrs []resolver.Address
    for _, addr := range addresses {
        addrs = append(addrs, resolver.Address{Addr: addr})
    }

    r.cc.UpdateState(resolver.State{Addresses: addrs})
}

func init() {
    resolver.Register(&exampleResolverBuilder{})
}

Load Balancing with Service Mesh

服务网格负载均衡

Kubernetes with Service Mesh (Istio/Linkerd):
yaml
apiVersion: v1
kind: Service
metadata:
  name: grpc-service
spec:
  selector:
    app: grpc-app
  ports:
  - name: grpc
    port: 50051
    targetPort: 50051
    protocol: TCP
  type: ClusterIP
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: grpc-service
spec:
  host: grpc-service
  trafficPolicy:
    loadBalancer:
      simple: ROUND_ROBIN
    connectionPool:
      http:
        http2MaxRequests: 1000
        maxRequestsPerConnection: 10
Kubernetes + 服务网格(Istio/Linkerd):
yaml
apiVersion: v1
kind: Service
metadata:
  name: grpc-service
spec:
  selector:
    app: grpc-app
  ports:
  - name: grpc
    port: 50051
    targetPort: 50051
    protocol: TCP
  type: ClusterIP
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: grpc-service
spec:
  host: grpc-service
  trafficPolicy:
    loadBalancer:
      simple: ROUND_ROBIN
    connectionPool:
      http:
        http2MaxRequests: 1000
        maxRequestsPerConnection: 10

Health Checking

健康检查

Implement health check service:
protobuf
service Health {
  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

message HealthCheckRequest {
  string service = 1;
}

message HealthCheckResponse {
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
    SERVICE_UNKNOWN = 3;
  }
  ServingStatus status = 1;
}
Implementation:
go
import "google.golang.org/grpc/health"
import healthpb "google.golang.org/grpc/health/grpc_health_v1"

healthServer := health.NewServer()
healthpb.RegisterHealthServer(grpcServer, healthServer)

// Set service status
healthServer.SetServingStatus("UserService", healthpb.HealthCheckResponse_SERVING)
实现健康检查服务:
protobuf
service Health {
  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

message HealthCheckRequest {
  string service = 1;
}

message HealthCheckResponse {
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
    SERVICE_UNKNOWN = 3;
  }
  ServingStatus status = 1;
}
实现:
go
import "google.golang.org/grpc/health"
import healthpb "google.golang.org/grpc/health/grpc_health_v1"

healthServer := health.NewServer()
healthpb.RegisterHealthServer(grpcServer, healthServer)

// 设置服务状态
healthServer.SetServingStatus("UserService", healthpb.HealthCheckResponse_SERVING)

Error Handling

错误处理

gRPC Status Codes

gRPC状态码

gRPC uses standardized status codes for error handling:
go
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"

// Return errors with appropriate codes
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    if req.Id <= 0 {
        return nil, status.Error(codes.InvalidArgument, "id must be positive")
    }

    user, err := s.db.GetUser(req.Id)
    if err == sql.ErrNoRows {
        return nil, status.Error(codes.NotFound, "user not found")
    }
    if err != nil {
        return nil, status.Error(codes.Internal, "database error")
    }

    return user, nil
}
Common Status Codes:
  • OK
    : Success
  • Canceled
    : Operation was cancelled
  • Unknown
    : Unknown error
  • InvalidArgument
    : Client specified invalid argument
  • DeadlineExceeded
    : Deadline expired before operation
  • NotFound
    : Entity not found
  • AlreadyExists
    : Entity already exists
  • PermissionDenied
    : Permission denied
  • ResourceExhausted
    : Resource exhausted (rate limit)
  • FailedPrecondition
    : Operation rejected (system not in valid state)
  • Aborted
    : Operation aborted
  • OutOfRange
    : Out of valid range
  • Unimplemented
    : Operation not implemented
  • Internal
    : Internal server error
  • Unavailable
    : Service unavailable
  • DataLoss
    : Unrecoverable data loss
  • Unauthenticated
    : Request lacks valid authentication
gRPC使用标准化状态码进行错误处理:
go
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"

// 返回带有合适状态码的错误
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    if req.Id <= 0 {
        return nil, status.Error(codes.InvalidArgument, "id must be positive")
    }

    user, err := s.db.GetUser(req.Id)
    if err == sql.ErrNoRows {
        return nil, status.Error(codes.NotFound, "user not found")
    }
    if err != nil {
        return nil, status.Error(codes.Internal, "database error")
    }

    return user, nil
}
常见状态码:
  • OK
    : 成功
  • Canceled
    : 操作已取消
  • Unknown
    : 未知错误
  • InvalidArgument
    : 客户端指定了无效参数
  • DeadlineExceeded
    : 操作在截止时间前未完成
  • NotFound
    : 实体未找到
  • AlreadyExists
    : 实体已存在
  • PermissionDenied
    : 权限被拒绝
  • ResourceExhausted
    : 资源耗尽(限流)
  • FailedPrecondition
    : 操作被拒绝(系统状态无效)
  • Aborted
    : 操作已中止
  • OutOfRange
    : 超出有效范围
  • Unimplemented
    : 操作未实现
  • Internal
    : 内部服务器错误
  • Unavailable
    : 服务不可用
  • DataLoss
    : 不可恢复的数据丢失
  • Unauthenticated
    : 请求缺少有效认证信息

Rich Error Details

富错误详情

Add structured error details:
go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

func (s *server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
    // Validate request
    violations := validateCreateUserRequest(req)
    if len(violations) > 0 {
        badRequest := &errdetails.BadRequest{}
        for field, msg := range violations {
            badRequest.FieldViolations = append(
                badRequest.FieldViolations,
                &errdetails.BadRequest_FieldViolation{
                    Field:       field,
                    Description: msg,
                },
            )
        }

        st := status.New(codes.InvalidArgument, "invalid request")
        st, _ = st.WithDetails(badRequest)
        return nil, st.Err()
    }

    // Create user...
}
Client-side error handling:
go
resp, err := client.CreateUser(ctx, req)
if err != nil {
    st := status.Convert(err)

    for _, detail := range st.Details() {
        switch t := detail.(type) {
        case *errdetails.BadRequest:
            for _, violation := range t.FieldViolations {
                fmt.Printf("Invalid field %s: %s\n",
                    violation.Field, violation.Description)
            }
        }
    }
}
添加结构化错误详情:
go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

func (s *server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
    // 验证请求
    violations := validateCreateUserRequest(req)
    if len(violations) > 0 {
        badRequest := &errdetails.BadRequest{}
        for field, msg := range violations {
            badRequest.FieldViolations = append(
                badRequest.FieldViolations,
                &errdetails.BadRequest_FieldViolation{
                    Field:       field,
                    Description: msg,
                },
            )
        }

        st := status.New(codes.InvalidArgument, "invalid request")
        st, _ = st.WithDetails(badRequest)
        return nil, st.Err()
    }

    // 创建用户...
}
客户端错误处理:
go
resp, err := client.CreateUser(ctx, req)
if err != nil {
    st := status.Convert(err)

    for _, detail := range st.Details() {
        switch t := detail.(type) {
        case *errdetails.BadRequest:
            for _, violation := range t.FieldViolations {
                fmt.Printf("Invalid field %s: %s\n",
                    violation.Field, violation.Description)
            }
        }
    }
}

Error Propagation

错误传播

go
func (s *server) ProcessOrder(ctx context.Context, req *pb.OrderRequest) (*pb.OrderResponse, error) {
    // Call inventory service
    inventory, err := s.inventoryClient.CheckInventory(ctx, &pb.InventoryRequest{
        ProductId: req.ProductId,
    })
    if err != nil {
        // Propagate error with additional context
        st := status.Convert(err)
        return nil, status.Errorf(st.Code(),
            "inventory check failed: %v", st.Message())
    }

    // Continue processing...
}
go
func (s *server) ProcessOrder(ctx context.Context, req *pb.OrderRequest) (*pb.OrderResponse, error) {
    // 调用库存服务
    inventory, err := s.inventoryClient.CheckInventory(ctx, &pb.InventoryRequest{
        ProductId: req.ProductId,
    })
    if err != nil {
        // 携带额外上下文传播错误
        st := status.Convert(err)
        return nil, status.Errorf(st.Code(),
            "inventory check failed: %v", st.Message())
    }

    // 继续处理...
}

Retry Logic

重试逻辑

go
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"

func CallWithRetry(ctx context.Context, maxRetries int, fn func() error) error {
    var err error

    for i := 0; i < maxRetries; i++ {
        err = fn()
        if err == nil {
            return nil
        }

        // Check if error is retryable
        st := status.Convert(err)
        if !isRetryable(st.Code()) {
            return err
        }

        // Exponential backoff
        backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
        time.Sleep(backoff)
    }

    return err
}

func isRetryable(code codes.Code) bool {
    return code == codes.Unavailable ||
           code == codes.DeadlineExceeded ||
           code == codes.ResourceExhausted
}
go
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"

func CallWithRetry(ctx context.Context, maxRetries int, fn func() error) error {
    var err error

    for i := 0; i < maxRetries; i++ {
        err = fn()
        if err == nil {
            return nil
        }

        // 检查错误是否可重试
        st := status.Convert(err)
        if !isRetryable(st.Code()) {
            return err
        }

        // 指数退避
        backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
        time.Sleep(backoff)
    }

    return err
}

func isRetryable(code codes.Code) bool {
    return code == codes.Unavailable ||
           code == codes.DeadlineExceeded ||
           code == codes.ResourceExhausted
}

Best Practices

最佳实践

1. Schema Evolution

1. Schema演进

DO:
  • Always use
    syntax = "proto3"
  • Never reuse field numbers
  • Use
    reserved
    for deprecated fields
  • Add new fields with new numbers
  • Use optional wrappers for nullable fields
protobuf
message User {
  int32 id = 1;
  string name = 2;
  // string age = 3;  // Deprecated
  reserved 3;
  reserved "age";

  string email = 4;
  google.protobuf.Int32Value phone = 5;  // Optional
}
DON'T:
  • Change field types
  • Reuse field numbers
  • Remove fields without reserving numbers
  • Change message names without aliases
建议:
  • 始终使用
    syntax = "proto3"
  • 绝不复用字段编号
  • 对废弃字段使用
    reserved
  • 使用新编号添加新字段
  • 对可空字段使用可选包装类型
protobuf
message User {
  int32 id = 1;
  string name = 2;
  // string age = 3;  // 已废弃
  reserved 3;
  reserved "age";

  string email = 4;
  google.protobuf.Int32Value phone = 5;  // 可选字段
}
禁止:
  • 更改字段类型
  • 复用字段编号
  • 不保留编号就删除字段
  • 不使用别名就更改消息名称

2. Performance Optimization

2. 性能优化

Connection Management:
go
// Reuse connections
var conn *grpc.ClientConn
var once sync.Once

func getConnection() *grpc.ClientConn {
    once.Do(func() {
        var err error
        conn, err = grpc.Dial(
            address,
            grpc.WithKeepaliveParams(keepalive.ClientParameters{
                Time:                10 * time.Second,
                Timeout:             3 * time.Second,
                PermitWithoutStream: true,
            }),
        )
        if err != nil {
            log.Fatal(err)
        }
    })
    return conn
}
Connection Pooling:
go
type ConnectionPool struct {
    connections []*grpc.ClientConn
    next        uint32
}

func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
    n := atomic.AddUint32(&p.next, 1)
    return p.connections[n%uint32(len(p.connections))]
}
Streaming for Large Data:
protobuf
// Instead of this:
rpc GetAllUsers(Empty) returns (UserList);  // Large response

// Use this:
rpc ListUsers(ListUsersRequest) returns (stream User);  // Streamed
连接管理:
go
// 复用连接
var conn *grpc.ClientConn
var once sync.Once

func getConnection() *grpc.ClientConn {
    once.Do(func() {
        var err error
        conn, err = grpc.Dial(
            address,
            grpc.WithKeepaliveParams(keepalive.ClientParameters{
                Time:                10 * time.Second,
                Timeout:             3 * time.Second,
                PermitWithoutStream: true,
            }),
        )
        if err != nil {
            log.Fatal(err)
        }
    })
    return conn
}
连接池:
go
type ConnectionPool struct {
    connections []*grpc.ClientConn
    next        uint32
}

func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
    n := atomic.AddUint32(&p.next, 1)
    return p.connections[n%uint32(len(p.connections))]
}
大数据流处理:
protobuf
// 不推荐:
rpc GetAllUsers(Empty) returns (UserList);  // 大型响应

// 推荐:
rpc ListUsers(ListUsersRequest) returns (stream User);  // 流式传输

3. Security Best Practices

3. 安全最佳实践

TLS Configuration:
go
// Server-side
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
server := grpc.NewServer(grpc.Creds(creds))

// Client-side
creds, err := credentials.NewClientTLSFromFile(certFile, "")
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))
Mutual TLS (mTLS):
go
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caFile)
certPool.AppendCertsFromPEM(ca)

creds := credentials.NewTLS(&tls.Config{
    Certificates: []tls.Certificate{cert},
    ClientAuth:   tls.RequireAndVerifyClientCert,
    ClientCAs:    certPool,
})

server := grpc.NewServer(grpc.Creds(creds))
Token Authentication:
go
type tokenAuth struct {
    token string
}

func (t tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
    return map[string]string{
        "authorization": "Bearer " + t.token,
    }, nil
}

func (t tokenAuth) RequireTransportSecurity() bool {
    return true
}

// Usage
conn, err := grpc.Dial(
    address,
    grpc.WithPerRPCCredentials(tokenAuth{token: "my-token"}),
)
TLS配置:
go
// 服务器端
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
server := grpc.NewServer(grpc.Creds(creds))

// 客户端
creds, err := credentials.NewClientTLSFromFile(certFile, "")
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))
双向TLS(mTLS):
go
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caFile)
certPool.AppendCertsFromPEM(ca)

creds := credentials.NewTLS(&tls.Config{
    Certificates: []tls.Certificate{cert},
    ClientAuth:   tls.RequireAndVerifyClientCert,
    ClientCAs:    certPool,
})

server := grpc.NewServer(grpc.Creds(creds))
令牌认证:
go
type tokenAuth struct {
    token string
}

func (t tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
    return map[string]string{
        "authorization": "Bearer " + t.token,
    }, nil
}

func (t tokenAuth) RequireTransportSecurity() bool {
    return true
}

// 使用方式
conn, err := grpc.Dial(
    address,
    grpc.WithPerRPCCredentials(tokenAuth{token: "my-token"}),
)

4. Timeout and Deadline Management

4. 超时与截止时间管理

go
// Set deadline for request
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 123})
if err != nil {
    if status.Code(err) == codes.DeadlineExceeded {
        log.Println("Request timed out")
    }
}
Server-side deadline propagation:
go
func (s *server) ComplexOperation(ctx context.Context, req *pb.Request) (*pb.Response, error) {
    // Check if deadline is already exceeded
    deadline, ok := ctx.Deadline()
    if ok && time.Now().After(deadline) {
        return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
    }

    // Propagate context to downstream calls
    user, err := s.userClient.GetUser(ctx, &pb.GetUserRequest{Id: req.UserId})
    if err != nil {
        return nil, err
    }

    // Continue with remaining time...
}
go
// 为请求设置截止时间
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 123})
if err != nil {
    if status.Code(err) == codes.DeadlineExceeded {
        log.Println("Request timed out")
    }
}
服务器端截止时间传播:
go
func (s *server) ComplexOperation(ctx context.Context, req *pb.Request) (*pb.Response, error) {
    // 检查截止时间是否已过期
    deadline, ok := ctx.Deadline()
    if ok && time.Now().After(deadline) {
        return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
    }

    // 向下游调用传播上下文
    user, err := s.userClient.GetUser(ctx, &pb.GetUserRequest{Id: req.UserId})
    if err != nil {
        return nil, err
    }

    // 利用剩余时间继续处理...
}

5. Monitoring and Observability

5. 监控与可观测性

Prometheus Metrics:
go
import "github.com/grpc-ecosystem/go-grpc-prometheus"

// Server metrics
grpcMetrics := grpc_prometheus.NewServerMetrics()
server := grpc.NewServer(
    grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
    grpc.StreamInterceptor(grpcMetrics.StreamServerInterceptor()),
)
grpcMetrics.InitializeMetrics(server)

// Expose metrics
http.Handle("/metrics", promhttp.Handler())
Prometheus指标:
go
import "github.com/grpc-ecosystem/go-grpc-prometheus"

// 服务器指标
grpcMetrics := grpc_prometheus.NewServerMetrics()
server := grpc.NewServer(
    grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
    grpc.StreamInterceptor(grpcMetrics.StreamServerInterceptor()),
)
grpcMetrics.InitializeMetrics(server)

// 暴露指标
http.Handle("/metrics", promhttp.Handler())

6. Graceful Shutdown

6. 优雅关闭

go
server := grpc.NewServer()
// Register services...

go func() {
    if err := server.Serve(listener); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}()

// Wait for interrupt signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit

log.Println("Shutting down server...")

// Graceful shutdown
server.GracefulStop()
log.Println("Server stopped")
go
server := grpc.NewServer()
// 注册服务...

go func() {
    if err := server.Serve(listener); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}()

// 等待中断信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit

log.Println("Shutting down server...")

// 优雅关闭
server.GracefulStop()
log.Println("Server stopped")

7. Service Versioning

7. 服务版本化

URL-based versioning:
protobuf
package api.v1;

service UserServiceV1 {
  rpc GetUser(GetUserRequest) returns (User);
}

package api.v2;

service UserServiceV2 {
  rpc GetUser(GetUserRequest) returns (User);
}
Field-based versioning:
protobuf
message User {
  int32 id = 1;
  string name = 2;
  string email = 3;

  // v2 additions
  string phone = 4;
  Address address = 5;
}
基于URL的版本化:
protobuf
package api.v1;

service UserServiceV1 {
  rpc GetUser(GetUserRequest) returns (User);
}

package api.v2;

service UserServiceV2 {
  rpc GetUser(GetUserRequest) returns (User);
}
基于字段的版本化:
protobuf
message User {
  int32 id = 1;
  string name = 2;
  string email = 3;

  // v2新增字段
  string phone = 4;
  Address address = 5;
}

8. Testing Best Practices

8. 测试最佳实践

Unit Testing with Mocks:
go
type mockUserClient struct {
    pb.UserServiceClient
    getUserFunc func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error)
}

func (m *mockUserClient) GetUser(ctx context.Context, req *pb.GetUserRequest, opts ...grpc.CallOption) (*pb.User, error) {
    return m.getUserFunc(ctx, req)
}

func TestOrderService(t *testing.T) {
    mockClient := &mockUserClient{
        getUserFunc: func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
            return &pb.User{Id: 1, Name: "Test User"}, nil
        },
    }

    // Test with mock...
}
Integration Testing:
go
func TestIntegration(t *testing.T) {
    // Start test server
    lis, err := net.Listen("tcp", ":0")
    require.NoError(t, err)

    server := grpc.NewServer()
    pb.RegisterUserServiceServer(server, &userServer{})

    go server.Serve(lis)
    defer server.Stop()

    // Connect client
    conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
    require.NoError(t, err)
    defer conn.Close()

    client := pb.NewUserServiceClient(conn)

    // Test requests...
}
使用Mock进行单元测试:
go
type mockUserClient struct {
    pb.UserServiceClient
    getUserFunc func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error)
}

func (m *mockUserClient) GetUser(ctx context.Context, req *pb.GetUserRequest, opts ...grpc.CallOption) (*pb.User, error) {
    return m.getUserFunc(ctx, req)
}

func TestOrderService(t *testing.T) {
    mockClient := &mockUserClient{
        getUserFunc: func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
            return &pb.User{Id: 1, Name: "Test User"}, nil
        },
    }

    // 使用Mock进行测试...
}
集成测试:
go
func TestIntegration(t *testing.T) {
    // 启动测试服务器
    lis, err := net.Listen("tcp", ":0")
    require.NoError(t, err)

    server := grpc.NewServer()
    pb.RegisterUserServiceServer(server, &userServer{})

    go server.Serve(lis)
    defer server.Stop()

    // 连接客户端
    conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
    require.NoError(t, err)
    defer conn.Close()

    client := pb.NewUserServiceClient(conn)

    // 测试请求...
}

Production Deployment Patterns

生产部署模式

Docker Deployment

Docker部署

Dockerfile:
dockerfile
FROM golang:1.21-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server ./cmd/server

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/

COPY --from=builder /app/server .
COPY --from=builder /app/proto ./proto

EXPOSE 50051

CMD ["./server"]
Dockerfile:
dockerfile
FROM golang:1.21-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server ./cmd/server

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/

COPY --from=builder /app/server .
COPY --from=builder /app/proto ./proto

EXPOSE 50051

CMD ["./server"]

Kubernetes Deployment

Kubernetes部署

deployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: grpc-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: grpc-service
  template:
    metadata:
      labels:
        app: grpc-service
    spec:
      containers:
      - name: grpc-service
        image: grpc-service:latest
        ports:
        - containerPort: 50051
          name: grpc
          protocol: TCP
        env:
        - name: PORT
          value: "50051"
        livenessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:50051"]
          initialDelaySeconds: 10
        readinessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:50051"]
          initialDelaySeconds: 5
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
  name: grpc-service
spec:
  selector:
    app: grpc-service
  ports:
  - port: 50051
    targetPort: 50051
    protocol: TCP
    name: grpc
  type: ClusterIP
deployment.yaml:
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: grpc-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: grpc-service
  template:
    metadata:
      labels:
        app: grpc-service
    spec:
      containers:
      - name: grpc-service
        image: grpc-service:latest
        ports:
        - containerPort: 50051
          name: grpc
          protocol: TCP
        env:
        - name: PORT
          value: "50051"
        livenessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:50051"]
          initialDelaySeconds: 10
        readinessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:50051"]
          initialDelaySeconds: 5
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
  name: grpc-service
spec:
  selector:
    app: grpc-service
  ports:
  - port: 50051
    targetPort: 50051
    protocol: TCP
    name: grpc
  type: ClusterIP

Service Mesh Integration (Istio)

服务网格集成(Istio)

VirtualService for traffic routing:
yaml
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: grpc-service
spec:
  hosts:
  - grpc-service
  http:
  - match:
    - headers:
        version:
          exact: v2
    route:
    - destination:
        host: grpc-service
        subset: v2
  - route:
    - destination:
        host: grpc-service
        subset: v1
流量路由VirtualService:
yaml
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: grpc-service
spec:
  hosts:
  - grpc-service
  http:
  - match:
    - headers:
        version:
          exact: v2
    route:
    - destination:
        host: grpc-service
        subset: v2
  - route:
    - destination:
        host: grpc-service
        subset: v1

Common Patterns and Anti-Patterns

常见模式与反模式

✅ DO:

✅ 建议:

  1. Use streaming for large datasets
  2. Implement proper error handling with status codes
  3. Add interceptors for cross-cutting concerns
  4. Use connection pooling for high-throughput clients
  5. Implement health checks
  6. Set appropriate timeouts and deadlines
  7. Use TLS in production
  8. Version your APIs
  9. Monitor with metrics and tracing
  10. Test with integration tests
  1. 对大型数据集使用流传输
  2. 使用状态码实现正确的错误处理
  3. 为横切关注点添加拦截器
  4. 对高吞吐量客户端使用连接池
  5. 实现健康检查
  6. 设置合适的超时与截止时间
  7. 在生产环境中使用TLS
  8. 对API进行版本化
  9. 使用指标与追踪进行监控
  10. 通过集成测试验证功能

❌ DON'T:

❌ 禁止:

  1. Don't use unary RPCs for large datasets - Use streaming instead
  2. Don't ignore context cancellation - Always check context.Done()
  3. Don't create new connections per request - Reuse connections
  4. Don't skip authentication/authorization - Always validate
  5. Don't forget graceful shutdown - Handle SIGTERM properly
  6. Don't hardcode endpoints - Use service discovery
  7. Don't ignore errors - Handle all error cases
  8. Don't use blocking operations without timeouts - Always set deadlines
  9. Don't skip health checks - Implement liveness/readiness probes
  10. Don't deploy without monitoring - Add metrics and logging

Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Microservices, gRPC, Distributed Systems, API Design Compatible With: Go, Python, Node.js, Java, C++, C#, Ruby, and more
  1. 不要对大型数据集使用一元RPC - 改用流传输
  2. 不要忽略上下文取消 - 始终检查context.Done()
  3. 不要为每个请求创建新连接 - 复用连接
  4. 不要跳过认证/授权 - 始终进行验证
  5. 不要忘记优雅关闭 - 正确处理SIGTERM信号
  6. 不要硬编码端点 - 使用服务发现
  7. 不要忽略错误 - 处理所有错误场景
  8. 不要在无超时的情况下使用阻塞操作 - 始终设置截止时间
  9. 不要跳过健康检查 - 实现存活/就绪探针
  10. 不要在无监控的情况下部署 - 添加指标与日志

技能版本: 1.0.0 最后更新: 2025年10月 技能分类: 微服务, gRPC, 分布式系统, API设计 兼容语言: Go, Python, Node.js, Java, C++, C#, Ruby等