Loading...
Loading...
Production gRPC in Go: protobuf layout, codegen, interceptors, deadlines, error codes, streaming, health checks, TLS, and testing with bufconn
npx skill4agent add bobmatnyc/claude-mpm-skills golang-grpc// proto/users/v1/users.proto
syntax = "proto3";
package users.v1;
option go_package = "example.com/myapp/gen/users/v1;usersv1";
service UsersService {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc ListUsers(ListUsersRequest) returns (stream User);
}
message GetUserRequest { string id = 1; }
message GetUserResponse { User user = 1; }
message ListUsersRequest { int32 page_size = 1; string page_token = 2; }
message User {
string id = 1;
string email = 2;
string display_name = 3;
}package users;go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latestprotoc -I proto \
--go_out=./gen --go_opt=paths=source_relative \
--go-grpc_out=./gen --go-grpc_opt=paths=source_relative \
proto/users/v1/users.protopackage usersvc
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
usersv1 "example.com/myapp/gen/users/v1"
)
type Service struct {
usersv1.UnimplementedUsersServiceServer
Repo Repo
}
type Repo interface {
GetUser(ctx context.Context, id string) (User, error)
}
type User struct {
ID, Email, DisplayName string
}
func (s *Service) GetUser(ctx context.Context, req *usersv1.GetUserRequest) (*usersv1.GetUserResponse, error) {
if req.GetId() == "" {
return nil, status.Error(codes.InvalidArgument, "id is required")
}
u, err := s.Repo.GetUser(ctx, req.GetId())
if err != nil {
if err == ErrNotFound {
return nil, status.Error(codes.NotFound, "user not found")
}
return nil, status.Error(codes.Internal, "internal error")
}
return &usersv1.GetUserResponse{
User: &usersv1.User{
Id: u.ID,
Email: u.Email,
DisplayName: u.DisplayName,
},
}, nil
}return nil, errors.New("user not found")if _, ok := ctx.Deadline(); !ok {
return nil, status.Error(codes.InvalidArgument, "deadline required")
}md, _ := metadata.FromIncomingContext(ctx)
auth := ""
if vals := md.Get("authorization"); len(vals) > 0 {
auth = vals[0]
}func unaryRequestID() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
id := uuid.NewString()
ctx = context.WithValue(ctx, requestIDKey{}, id)
resp, err := handler(ctx, req)
return resp, err
}
}func (s *Service) ListUsers(req *usersv1.ListUsersRequest, stream usersv1.UsersService_ListUsersServer) error {
users, err := s.Repo.ListUsers(stream.Context(), int(req.GetPageSize()))
if err != nil {
return status.Error(codes.Internal, "internal error")
}
for _, u := range users {
select {
case <-stream.Context().Done():
return stream.Context().Err()
default:
}
if err := stream.Send(&usersv1.User{
Id: u.ID,
Email: u.Email,
DisplayName: u.DisplayName,
}); err != nil {
return err
}
}
return nil
}hs := health.NewServer()
grpc_health_v1.RegisterHealthServer(s, hs)
if env != "production" {
reflection.Register(s)
}GracefulStopstopped := make(chan struct{})
go func() {
grpcServer.GracefulStop()
close(stopped)
}()
select {
case <-stopped:
case <-time.After(10 * time.Second):
grpcServer.Stop()
}creds, err := credentials.NewServerTLSFromFile("server.crt", "server.key")
if err != nil { return err }
grpcServer := grpc.NewServer(grpc.Creds(creds))bufconnconst bufSize = 1024 * 1024
lis := bufconn.Listen(bufSize)
srv := grpc.NewServer()
usersv1.RegisterUsersServiceServer(srv, &Service{Repo: repo})
go func() { _ = srv.Serve(lis) }()
ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return lis.Dial() }),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil { t.Fatal(err) }
defer conn.Close()
client := usersv1.NewUsersServiceClient(conn)
resp, err := client.GetUser(ctx, &usersv1.GetUserRequest{Id: "1"})
_ = resp
_ = errcodes.*status.Errorstatus.Errorfctx.Done()SendUNKNOWNstatus.Error(codes.X, "...")ctxctx.Done()stream.Send