grpc-expert

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

gRPC Expert

gRPC专家

Expert guidance for gRPC services, Protocol Buffers, microservices communication, and streaming patterns.
提供gRPC服务、Protocol Buffers、微服务通信与流模式的专业指导。

Core Concepts

核心概念

gRPC Fundamentals

gRPC基础

  • Protocol Buffers (protobuf)
  • Service definitions
  • RPC patterns (unary, server streaming, client streaming, bidirectional)
  • HTTP/2 transport
  • Code generation
  • Interceptors and middleware
  • Protocol Buffers (protobuf)
  • 服务定义
  • RPC模式(一元、服务端流、客户端流、双向流)
  • HTTP/2传输
  • 代码生成
  • 拦截器与中间件

Communication Patterns

通信模式

  • Unary RPC (request-response)
  • Server streaming RPC
  • Client streaming RPC
  • Bidirectional streaming RPC
  • Deadline/timeout handling
  • Error handling and status codes
  • 一元RPC(请求-响应)
  • 服务端流RPC
  • 客户端流RPC
  • 双向流RPC
  • 截止时间/超时处理
  • 错误处理与状态码

Production Features

生产环境特性

  • Load balancing
  • Service discovery
  • Health checking
  • Authentication (TLS, tokens)
  • Monitoring and tracing
  • Retry policies
  • 负载均衡
  • 服务发现
  • 健康检查
  • 身份认证(TLS、令牌)
  • 监控与链路追踪
  • 重试策略

Protocol Buffer Definition

Protocol Buffer定义

protobuf
syntax = "proto3";

package user.v1;

import "google/protobuf/timestamp.proto";

service UserService {
  // Unary RPC
  rpc GetUser(GetUserRequest) returns (GetUserResponse);

  // Server streaming
  rpc ListUsers(ListUsersRequest) returns (stream User);

  // Client streaming
  rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);

  // Bidirectional streaming
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message User {
  string id = 1;
  string email = 2;
  string name = 3;
  google.protobuf.Timestamp created_at = 4;
  UserRole role = 5;
}

enum UserRole {
  USER_ROLE_UNSPECIFIED = 0;
  USER_ROLE_USER = 1;
  USER_ROLE_ADMIN = 2;
}

message GetUserRequest {
  string id = 1;
}

message GetUserResponse {
  User user = 1;
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
}

message CreateUserRequest {
  string email = 1;
  string name = 2;
}

message CreateUsersResponse {
  repeated string user_ids = 1;
  int32 created_count = 2;
}

message ChatMessage {
  string user_id = 1;
  string message = 2;
  google.protobuf.Timestamp timestamp = 3;
}
protobuf
syntax = "proto3";

package user.v1;

import "google/protobuf/timestamp.proto";

service UserService {
  // Unary RPC
  rpc GetUser(GetUserRequest) returns (GetUserResponse);

  // Server streaming
  rpc ListUsers(ListUsersRequest) returns (stream User);

  // Client streaming
  rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);

  // Bidirectional streaming
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message User {
  string id = 1;
  string email = 2;
  string name = 3;
  google.protobuf.Timestamp created_at = 4;
  UserRole role = 5;
}

enum UserRole {
  USER_ROLE_UNSPECIFIED = 0;
  USER_ROLE_USER = 1;
  USER_ROLE_ADMIN = 2;
}

message GetUserRequest {
  string id = 1;
}

message GetUserResponse {
  User user = 1;
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
}

message CreateUserRequest {
  string email = 1;
  string name = 2;
}

message CreateUsersResponse {
  repeated string user_ids = 1;
  int32 created_count = 2;
}

message ChatMessage {
  string user_id = 1;
  string message = 2;
  google.protobuf.Timestamp timestamp = 3;
}

Python gRPC Server

Python gRPC服务端

python
import grpc
from concurrent import futures
import logging
from typing import Iterator

import user_pb2
import user_pb2_grpc

class UserService(user_pb2_grpc.UserServiceServicer):
    def __init__(self):
        self.users = {}

    def GetUser(self, request, context):
        """Unary RPC"""
        user_id = request.id

        if user_id not in self.users:
            context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found")

        user = self.users[user_id]
        return user_pb2.GetUserResponse(user=user)

    def ListUsers(self, request, context):
        """Server streaming RPC"""
        page_size = request.page_size or 10

        for i, user in enumerate(self.users.values()):
            if i >= page_size:
                break
            yield user

    def CreateUsers(self, request_iterator, context):
        """Client streaming RPC"""
        created_ids = []

        for request in request_iterator:
            user_id = self._generate_id()
            user = user_pb2.User(
                id=user_id,
                email=request.email,
                name=request.name
            )
            self.users[user_id] = user
            created_ids.append(user_id)

        return user_pb2.CreateUsersResponse(
            user_ids=created_ids,
            created_count=len(created_ids)
        )

    def Chat(self, request_iterator, context):
        """Bidirectional streaming RPC"""
        for message in request_iterator:
            # Echo back with modification
            response = user_pb2.ChatMessage(
                user_id="server",
                message=f"Echo: {message.message}",
                timestamp=message.timestamp
            )
            yield response

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    user_pb2_grpc.add_UserServiceServicer_to_server(UserService(), server)

    server.add_insecure_port('[::]:50051')
    server.start()
    print("Server started on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    logging.basicConfig()
    serve()
python
import grpc
from concurrent import futures
import logging
from typing import Iterator

import user_pb2
import user_pb2_grpc

class UserService(user_pb2_grpc.UserServiceServicer):
    def __init__(self):
        self.users = {}

    def GetUser(self, request, context):
        """Unary RPC"""
        user_id = request.id

        if user_id not in self.users:
            context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found")

        user = self.users[user_id]
        return user_pb2.GetUserResponse(user=user)

    def ListUsers(self, request, context):
        """Server streaming RPC"""
        page_size = request.page_size or 10

        for i, user in enumerate(self.users.values()):
            if i >= page_size:
                break
            yield user

    def CreateUsers(self, request_iterator, context):
        """Client streaming RPC"""
        created_ids = []

        for request in request_iterator:
            user_id = self._generate_id()
            user = user_pb2.User(
                id=user_id,
                email=request.email,
                name=request.name
            )
            self.users[user_id] = user
            created_ids.append(user_id)

        return user_pb2.CreateUsersResponse(
            user_ids=created_ids,
            created_count=len(created_ids)
        )

    def Chat(self, request_iterator, context):
        """Bidirectional streaming RPC"""
        for message in request_iterator:
            # Echo back with modification
            response = user_pb2.ChatMessage(
                user_id="server",
                message=f"Echo: {message.message}",
                timestamp=message.timestamp
            )
            yield response

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    user_pb2_grpc.add_UserServiceServicer_to_server(UserService(), server)

    server.add_insecure_port('[::]:50051')
    server.start()
    print("Server started on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    logging.basicConfig()
    serve()

Go gRPC Server

Go gRPC服务端

go
package main

import (
    "context"
    "io"
    "log"
    "net"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    pb "example.com/user/v1"
)

type userServer struct {
    pb.UnimplementedUserServiceServer
    users map[string]*pb.User
}

func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id)
    }

    return &pb.GetUserResponse{User: user}, nil
}

func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
    pageSize := req.PageSize
    if pageSize == 0 {
        pageSize = 10
    }

    count := 0
    for _, user := range s.users {
        if count >= int(pageSize) {
            break
        }

        if err := stream.Send(user); err != nil {
            return err
        }
        count++
    }

    return nil
}

func (s *userServer) CreateUsers(stream pb.UserService_CreateUsersServer) error {
    var userIds []string

    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.CreateUsersResponse{
                UserIds:      userIds,
                CreatedCount: int32(len(userIds)),
            })
        }
        if err != nil {
            return err
        }

        userId := generateID()
        user := &pb.User{
            Id:    userId,
            Email: req.Email,
            Name:  req.Name,
        }

        s.users[userId] = user
        userIds = append(userIds, userId)
    }
}

func (s *userServer) Chat(stream pb.UserService_ChatServer) error {
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        response := &pb.ChatMessage{
            UserId:    "server",
            Message:   "Echo: " + msg.Message,
            Timestamp: msg.Timestamp,
        }

        if err := stream.Send(response); err != nil {
            return err
        }
    }
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    s := grpc.NewServer()
    pb.RegisterUserServiceServer(s, &userServer{
        users: make(map[string]*pb.User),
    })

    log.Println("Server listening on :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
go
package main

import (
    "context"
    "io"
    "log"
    "net"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    pb "example.com/user/v1"
)

type userServer struct {
    pb.UnimplementedUserServiceServer
    users map[string]*pb.User
}

func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id)
    }

    return &pb.GetUserResponse{User: user}, nil
}

func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
    pageSize := req.PageSize
    if pageSize == 0 {
        pageSize = 10
    }

    count := 0
    for _, user := range s.users {
        if count >= int(pageSize) {
            break
        }

        if err := stream.Send(user); err != nil {
            return err
        }
        count++
    }

    return nil
}

func (s *userServer) CreateUsers(stream pb.UserService_CreateUsersServer) error {
    var userIds []string

    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.CreateUsersResponse{
                UserIds:      userIds,
                CreatedCount: int32(len(userIds)),
            })
        }
        if err != nil {
            return err
        }

        userId := generateID()
        user := &pb.User{
            Id:    userId,
            Email: req.Email,
            Name:  req.Name,
        }

        s.users[userId] = user
        userIds = append(userIds, userId)
    }
}

func (s *userServer) Chat(stream pb.UserService_ChatServer) error {
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        response := &pb.ChatMessage{
            UserId:    "server",
            Message:   "Echo: " + msg.Message,
            Timestamp: msg.Timestamp,
        }

        if err := stream.Send(response); err != nil {
            return err
        }
    }
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    s := grpc.NewServer()
    pb.RegisterUserServiceServer(s, &userServer{
        users: make(map[string]*pb.User),
    })

    log.Println("Server listening on :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

gRPC Client with Interceptors

带拦截器的gRPC客户端

python
import grpc
import user_pb2
import user_pb2_grpc
from typing import Callable

def logging_interceptor(
    method: Callable,
    request,
    call_details: grpc.ClientCallDetails
):
    """Unary-unary interceptor for logging"""
    print(f"Calling method: {call_details.method}")
    print(f"Request: {request}")

    response = method(request, call_details)

    print(f"Response: {response}")
    return response

class UserClient:
    def __init__(self, host: str = 'localhost:50051'):
        # Create channel with interceptor
        self.channel = grpc.insecure_channel(host)
        self.channel = grpc.intercept_channel(
            self.channel,
            logging_interceptor
        )
        self.stub = user_pb2_grpc.UserServiceStub(self.channel)

    def get_user(self, user_id: str) -> user_pb2.User:
        """Call unary RPC"""
        request = user_pb2.GetUserRequest(id=user_id)

        try:
            response = self.stub.GetUser(
                request,
                timeout=5.0  # 5 second timeout
            )
            return response.user
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()} - {e.details()}")
            raise

    def list_users(self, page_size: int = 10):
        """Call server streaming RPC"""
        request = user_pb2.ListUsersRequest(page_size=page_size)

        try:
            for user in self.stub.ListUsers(request):
                yield user
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()} - {e.details()}")

    def create_users_batch(self, users: list):
        """Call client streaming RPC"""
        def request_generator():
            for user in users:
                yield user_pb2.CreateUserRequest(
                    email=user['email'],
                    name=user['name']
                )

        try:
            response = self.stub.CreateUsers(request_generator())
            return response
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()} - {e.details()}")

    def close(self):
        self.channel.close()
python
import grpc
import user_pb2
import user_pb2_grpc
from typing import Callable

def logging_interceptor(
    method: Callable,
    request,
    call_details: grpc.ClientCallDetails
):
    """Unary-unary interceptor for logging"""
    print(f"Calling method: {call_details.method}")
    print(f"Request: {request}")

    response = method(request, call_details)

    print(f"Response: {response}")
    return response

class UserClient:
    def __init__(self, host: str = 'localhost:50051'):
        # Create channel with interceptor
        self.channel = grpc.insecure_channel(host)
        self.channel = grpc.intercept_channel(
            self.channel,
            logging_interceptor
        )
        self.stub = user_pb2_grpc.UserServiceStub(self.channel)

    def get_user(self, user_id: str) -> user_pb2.User:
        """Call unary RPC"""
        request = user_pb2.GetUserRequest(id=user_id)

        try:
            response = self.stub.GetUser(
                request,
                timeout=5.0  # 5 second timeout
            )
            return response.user
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()} - {e.details()}")
            raise

    def list_users(self, page_size: int = 10):
        """Call server streaming RPC"""
        request = user_pb2.ListUsersRequest(page_size=page_size)

        try:
            for user in self.stub.ListUsers(request):
                yield user
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()} - {e.details()}")

    def create_users_batch(self, users: list):
        """Call client streaming RPC"""
        def request_generator():
            for user in users:
                yield user_pb2.CreateUserRequest(
                    email=user['email'],
                    name=user['name']
                )

        try:
            response = self.stub.CreateUsers(request_generator())
            return response
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()} - {e.details()}")

    def close(self):
        self.channel.close()

Best Practices

最佳实践

Design

设计

  • Use semantic versioning for protobuf packages
  • Design backward-compatible changes
  • Use proper field numbering (never reuse)
  • Include metadata in messages
  • Use enums with explicit UNSPECIFIED value
  • Design for pagination in list operations
  • Protobuf包使用语义化版本号
  • 设计向后兼容的变更
  • 使用合理的字段编号(永远不要复用编号)
  • 消息中包含元数据
  • 使用带明确UNSPECIFIED值的枚举
  • 列表操作设计分页能力

Performance

性能

  • Enable HTTP/2 connection pooling
  • Use streaming for large data transfers
  • Implement proper timeouts
  • Use compression for large payloads
  • Batch operations when possible
  • Monitor and tune thread pool sizes
  • 开启HTTP/2连接池
  • 大数据传输使用流模式
  • 实现合理的超时设置
  • 大载荷使用压缩
  • 尽可能批量操作
  • 监控并调优线程池大小

Production

生产环境

  • Implement health checks
  • Use TLS for secure communication
  • Add authentication/authorization
  • Implement retry policies with backoff
  • Monitor gRPC metrics (latency, errors)
  • Use service discovery for dynamic endpoints
  • 实现健康检查
  • 使用TLS保障通信安全
  • 添加身份认证/授权机制
  • 实现带退避策略的重试机制
  • 监控gRPC指标(延迟、错误)
  • 动态端点使用服务发现

Anti-Patterns

反模式

❌ Breaking backward compatibility ❌ No timeout configuration ❌ Ignoring error status codes ❌ Not handling stream cancellation ❌ Over-sized messages ❌ No health checking ❌ Missing authentication
❌ 破坏向后兼容性 ❌ 无超时配置 ❌ 忽略错误状态码 ❌ 不处理流取消 ❌ 消息体积过大 ❌ 无健康检查 ❌ 缺失身份认证

Resources

资源