Loading...
Loading...
Comprehensive gRPC microservices skill covering protobuf schemas, service definitions, streaming patterns, interceptors, load balancing, and production gRPC architecture
npx skill4agent add manutej/luxor-claude-marketplace grpc-microservicessyntax = "proto3";
message User {
int32 id = 1;
string name = 2;
string email = 3;
}.protoservice UserService {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
}rpc GetUser(GetUserRequest) returns (GetUserResponse);rpc ListUsers(ListUsersRequest) returns (stream User);rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);rpc Chat(stream ChatMessage) returns (stream ChatMessage);message User {
int32 id = 1; // Never change this number
string name = 2; // Never change this number
string email = 3; // Never change this number
// int32 age = 4; // DEPRECATED - don't reuse 4
string phone = 5; // New field - use next available
}enum UserRole {
USER_ROLE_UNSPECIFIED = 0; // Always have a zero value
USER_ROLE_ADMIN = 1;
USER_ROLE_MODERATOR = 2;
USER_ROLE_MEMBER = 3;
}
message User {
int32 id = 1;
string name = 2;
UserRole role = 3;
}message User {
int32 id = 1;
string name = 2;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}
Address address = 3;
repeated Address additional_addresses = 4;
}repeatedmessage UserList {
repeated User users = 1;
}
message User {
int32 id = 1;
string name = 2;
repeated string tags = 3;
}oneofmessage SearchRequest {
string query = 1;
oneof filter {
string category = 2;
int32 user_id = 3;
string tag = 4;
}
}google.protobufimport "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
message Event {
string id = 1;
string name = 2;
google.protobuf.Timestamp created_at = 3;
google.protobuf.Duration duration = 4;
google.protobuf.Int32Value optional_count = 5;
}service UserService {
// Get single resource
rpc GetUser(GetUserRequest) returns (User);
// List resources
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
// Create resource
rpc CreateUser(CreateUserRequest) returns (User);
// Update resource
rpc UpdateUser(UpdateUserRequest) returns (User);
// Delete resource
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
}message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
int32 total_count = 3;
}message BatchGetUsersRequest {
repeated int32 user_ids = 1;
}
message BatchGetUsersResponse {
map<int32, User> users = 1;
repeated int32 not_found = 2;
}import "google/longrunning/operations.proto";
service BatchJobService {
rpc ProcessBatch(BatchRequest) returns (google.longrunning.Operation);
rpc GetOperation(GetOperationRequest) returns (google.longrunning.Operation);
}service ProductService {
rpc SearchProducts(SearchRequest) returns (stream Product);
}
message SearchRequest {
string query = 1;
int32 limit = 2;
}func (s *server) SearchProducts(req *pb.SearchRequest, stream pb.ProductService_SearchProductsServer) error {
products := s.db.Search(req.Query, req.Limit)
for _, product := range products {
if err := stream.Send(&product); err != nil {
return err
}
}
return nil
}service EventService {
rpc SubscribeToEvents(SubscribeRequest) returns (stream Event);
}
message SubscribeRequest {
repeated string event_types = 1;
google.protobuf.Timestamp since = 2;
}service LogService {
rpc TailLogs(TailRequest) returns (stream LogEntry);
}
message TailRequest {
string service_name = 1;
string level = 2;
int32 lines = 3;
}service UploadService {
rpc UploadImages(stream ImageChunk) returns (UploadSummary);
}
message ImageChunk {
string filename = 1;
bytes data = 2;
int32 chunk_number = 3;
}
message UploadSummary {
int32 total_images = 1;
int64 total_bytes = 2;
repeated string uploaded_filenames = 3;
}func (s *server) UploadImages(stream pb.UploadService_UploadImagesServer) error {
var count int32
var totalBytes int64
var filenames []string
for {
chunk, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.UploadSummary{
TotalImages: count,
TotalBytes: totalBytes,
UploadedFilenames: filenames,
})
}
if err != nil {
return err
}
// Process chunk
totalBytes += int64(len(chunk.Data))
if chunk.ChunkNumber == 0 {
count++
filenames = append(filenames, chunk.Filename)
}
}
}service AnalyticsService {
rpc RecordMetrics(stream Metric) returns (AggregateResult);
}
message Metric {
string name = 1;
double value = 2;
google.protobuf.Timestamp timestamp = 3;
}service ChatService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user_id = 1;
string room_id = 2;
string content = 3;
google.protobuf.Timestamp timestamp = 4;
}func (s *server) Chat(stream pb.ChatService_ChatServer) error {
// Create channel for this client
clientID := uuid.New().String()
msgChan := make(chan *pb.ChatMessage, 10)
// Register client
s.mu.Lock()
s.clients[clientID] = msgChan
s.mu.Unlock()
defer func() {
s.mu.Lock()
delete(s.clients, clientID)
close(msgChan)
s.mu.Unlock()
}()
// Goroutine to send messages to client
go func() {
for msg := range msgChan {
if err := stream.Send(msg); err != nil {
return
}
}
}()
// Receive messages from client
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// Broadcast to all clients in room
s.broadcast(msg)
}
}service CollaborationService {
rpc Collaborate(stream DocumentEdit) returns (stream DocumentEdit);
}
message DocumentEdit {
string document_id = 1;
string user_id = 2;
int32 position = 3;
string content = 4;
enum Operation {
OPERATION_UNSPECIFIED = 0;
OPERATION_INSERT = 1;
OPERATION_DELETE = 2;
OPERATION_UPDATE = 3;
}
Operation operation = 5;
}service GameService {
rpc PlayGame(stream GameAction) returns (stream GameState);
}
message GameAction {
string player_id = 1;
string game_id = 2;
string action_type = 3;
bytes action_data = 4;
}
message GameState {
string game_id = 1;
repeated PlayerState players = 2;
bytes world_state = 3;
google.protobuf.Timestamp timestamp = 4;
}func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Pre-processing
start := time.Now()
log.Printf("Method: %s, Start: %v", info.FullMethod, start)
// Call the handler
resp, err := handler(ctx, req)
// Post-processing
duration := time.Since(start)
log.Printf("Method: %s, Duration: %v, Error: %v",
info.FullMethod, duration, err)
return resp, err
}
}
// Usage
server := grpc.NewServer(
grpc.UnaryInterceptor(UnaryServerInterceptor()),
)func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
// Call the remote method
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("Method: %s, Duration: %v, Error: %v",
method, time.Since(start), err)
return err
}
}
// Usage
conn, err := grpc.Dial(
address,
grpc.WithUnaryInterceptor(UnaryClientInterceptor()),
)func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
log.Printf("Stream started: %s", info.FullMethod)
err := handler(srv, ss)
log.Printf("Stream ended: %s, Error: %v", info.FullMethod, err)
return err
}
}func AuthInterceptor(secret string) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Extract metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no metadata")
}
// Get authorization token
tokens := md["authorization"]
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "no token")
}
// Validate token
token := tokens[0]
claims, err := validateJWT(token, secret)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
// Add claims to context
ctx = context.WithValue(ctx, "claims", claims)
return handler(ctx, req)
}
}func LoggingInterceptor(logger *log.Logger) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
// Get request ID from metadata
requestID := getRequestID(ctx)
logger.Printf("[%s] Request: %s", requestID, info.FullMethod)
resp, err := handler(ctx, req)
duration := time.Since(start)
statusCode := status.Code(err)
logger.Printf("[%s] Response: %s, Duration: %v, Status: %v",
requestID, info.FullMethod, duration, statusCode)
return resp, err
}
}func RateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
if !limiter.Allow() {
return nil, status.Error(
codes.ResourceExhausted,
"rate limit exceeded",
)
}
return handler(ctx, req)
}
}func TracingInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
ctx, span := tracer.Start(ctx, info.FullMethod)
defer span.End()
// Add attributes
span.SetAttributes(
attribute.String("rpc.method", info.FullMethod),
attribute.String("rpc.service", "MyService"),
)
resp, err := handler(ctx, req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes2.Error, err.Error())
} else {
span.SetStatus(codes2.Ok, "")
}
return resp, err
}
}func RecoveryInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered: %v\n%s", r, debug.Stack())
err = status.Error(codes.Internal, "internal server error")
}
}()
return handler(ctx, req)
}
}server := grpc.NewServer(
grpc.ChainUnaryInterceptor(
RecoveryInterceptor(),
LoggingInterceptor(logger),
TracingInterceptor(tracer),
AuthInterceptor(jwtSecret),
RateLimitInterceptor(limiter),
),
grpc.ChainStreamInterceptor(
StreamRecoveryInterceptor(),
StreamLoggingInterceptor(logger),
),
)import "google.golang.org/grpc/balancer/roundrobin"
conn, err := grpc.Dial(
"dns:///my-service.example.com:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
grpc.WithInsecure(),
)conn, err := grpc.Dial(
"dns:///my-service.example.com:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`),
grpc.WithInsecure(),
)type exampleResolver struct {
target resolver.Target
cc resolver.ClientConn
addrsStore map[string][]string
}
func (r *exampleResolver) ResolveNow(resolver.ResolveNowOptions) {
// Discover service addresses
addresses := r.discoverServices()
var addrs []resolver.Address
for _, addr := range addresses {
addrs = append(addrs, resolver.Address{Addr: addr})
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func init() {
resolver.Register(&exampleResolverBuilder{})
}apiVersion: v1
kind: Service
metadata:
name: grpc-service
spec:
selector:
app: grpc-app
ports:
- name: grpc
port: 50051
targetPort: 50051
protocol: TCP
type: ClusterIP
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
name: grpc-service
spec:
host: grpc-service
trafficPolicy:
loadBalancer:
simple: ROUND_ROBIN
connectionPool:
http:
http2MaxRequests: 1000
maxRequestsPerConnection: 10service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}import "google.golang.org/grpc/health"
import healthpb "google.golang.org/grpc/health/grpc_health_v1"
healthServer := health.NewServer()
healthpb.RegisterHealthServer(grpcServer, healthServer)
// Set service status
healthServer.SetServingStatus("UserService", healthpb.HealthCheckResponse_SERVING)import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"
// Return errors with appropriate codes
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
if req.Id <= 0 {
return nil, status.Error(codes.InvalidArgument, "id must be positive")
}
user, err := s.db.GetUser(req.Id)
if err == sql.ErrNoRows {
return nil, status.Error(codes.NotFound, "user not found")
}
if err != nil {
return nil, status.Error(codes.Internal, "database error")
}
return user, nil
}OKCanceledUnknownInvalidArgumentDeadlineExceededNotFoundAlreadyExistsPermissionDeniedResourceExhaustedFailedPreconditionAbortedOutOfRangeUnimplementedInternalUnavailableDataLossUnauthenticatedimport "google.golang.org/genproto/googleapis/rpc/errdetails"
func (s *server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
// Validate request
violations := validateCreateUserRequest(req)
if len(violations) > 0 {
badRequest := &errdetails.BadRequest{}
for field, msg := range violations {
badRequest.FieldViolations = append(
badRequest.FieldViolations,
&errdetails.BadRequest_FieldViolation{
Field: field,
Description: msg,
},
)
}
st := status.New(codes.InvalidArgument, "invalid request")
st, _ = st.WithDetails(badRequest)
return nil, st.Err()
}
// Create user...
}resp, err := client.CreateUser(ctx, req)
if err != nil {
st := status.Convert(err)
for _, detail := range st.Details() {
switch t := detail.(type) {
case *errdetails.BadRequest:
for _, violation := range t.FieldViolations {
fmt.Printf("Invalid field %s: %s\n",
violation.Field, violation.Description)
}
}
}
}func (s *server) ProcessOrder(ctx context.Context, req *pb.OrderRequest) (*pb.OrderResponse, error) {
// Call inventory service
inventory, err := s.inventoryClient.CheckInventory(ctx, &pb.InventoryRequest{
ProductId: req.ProductId,
})
if err != nil {
// Propagate error with additional context
st := status.Convert(err)
return nil, status.Errorf(st.Code(),
"inventory check failed: %v", st.Message())
}
// Continue processing...
}import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"
func CallWithRetry(ctx context.Context, maxRetries int, fn func() error) error {
var err error
for i := 0; i < maxRetries; i++ {
err = fn()
if err == nil {
return nil
}
// Check if error is retryable
st := status.Convert(err)
if !isRetryable(st.Code()) {
return err
}
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
time.Sleep(backoff)
}
return err
}
func isRetryable(code codes.Code) bool {
return code == codes.Unavailable ||
code == codes.DeadlineExceeded ||
code == codes.ResourceExhausted
}syntax = "proto3"reservedmessage User {
int32 id = 1;
string name = 2;
// string age = 3; // Deprecated
reserved 3;
reserved "age";
string email = 4;
google.protobuf.Int32Value phone = 5; // Optional
}// Reuse connections
var conn *grpc.ClientConn
var once sync.Once
func getConnection() *grpc.ClientConn {
once.Do(func() {
var err error
conn, err = grpc.Dial(
address,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)
if err != nil {
log.Fatal(err)
}
})
return conn
}type ConnectionPool struct {
connections []*grpc.ClientConn
next uint32
}
func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
n := atomic.AddUint32(&p.next, 1)
return p.connections[n%uint32(len(p.connections))]
}// Instead of this:
rpc GetAllUsers(Empty) returns (UserList); // Large response
// Use this:
rpc ListUsers(ListUsersRequest) returns (stream User); // Streamed// Server-side
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
server := grpc.NewServer(grpc.Creds(creds))
// Client-side
creds, err := credentials.NewClientTLSFromFile(certFile, "")
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(creds))cert, err := tls.LoadX509KeyPair(certFile, keyFile)
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caFile)
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})
server := grpc.NewServer(grpc.Creds(creds))type tokenAuth struct {
token string
}
func (t tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + t.token,
}, nil
}
func (t tokenAuth) RequireTransportSecurity() bool {
return true
}
// Usage
conn, err := grpc.Dial(
address,
grpc.WithPerRPCCredentials(tokenAuth{token: "my-token"}),
)// Set deadline for request
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 123})
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
log.Println("Request timed out")
}
}func (s *server) ComplexOperation(ctx context.Context, req *pb.Request) (*pb.Response, error) {
// Check if deadline is already exceeded
deadline, ok := ctx.Deadline()
if ok && time.Now().After(deadline) {
return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
// Propagate context to downstream calls
user, err := s.userClient.GetUser(ctx, &pb.GetUserRequest{Id: req.UserId})
if err != nil {
return nil, err
}
// Continue with remaining time...
}import "github.com/grpc-ecosystem/go-grpc-prometheus"
// Server metrics
grpcMetrics := grpc_prometheus.NewServerMetrics()
server := grpc.NewServer(
grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
grpc.StreamInterceptor(grpcMetrics.StreamServerInterceptor()),
)
grpcMetrics.InitializeMetrics(server)
// Expose metrics
http.Handle("/metrics", promhttp.Handler())server := grpc.NewServer()
// Register services...
go func() {
if err := server.Serve(listener); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
// Wait for interrupt signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// Graceful shutdown
server.GracefulStop()
log.Println("Server stopped")package api.v1;
service UserServiceV1 {
rpc GetUser(GetUserRequest) returns (User);
}
package api.v2;
service UserServiceV2 {
rpc GetUser(GetUserRequest) returns (User);
}message User {
int32 id = 1;
string name = 2;
string email = 3;
// v2 additions
string phone = 4;
Address address = 5;
}type mockUserClient struct {
pb.UserServiceClient
getUserFunc func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error)
}
func (m *mockUserClient) GetUser(ctx context.Context, req *pb.GetUserRequest, opts ...grpc.CallOption) (*pb.User, error) {
return m.getUserFunc(ctx, req)
}
func TestOrderService(t *testing.T) {
mockClient := &mockUserClient{
getUserFunc: func(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
return &pb.User{Id: 1, Name: "Test User"}, nil
},
}
// Test with mock...
}func TestIntegration(t *testing.T) {
// Start test server
lis, err := net.Listen("tcp", ":0")
require.NoError(t, err)
server := grpc.NewServer()
pb.RegisterUserServiceServer(server, &userServer{})
go server.Serve(lis)
defer server.Stop()
// Connect client
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
require.NoError(t, err)
defer conn.Close()
client := pb.NewUserServiceClient(conn)
// Test requests...
}FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server ./cmd/server
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY /app/server .
COPY /app/proto ./proto
EXPOSE 50051
CMD ["./server"]apiVersion: apps/v1
kind: Deployment
metadata:
name: grpc-service
spec:
replicas: 3
selector:
matchLabels:
app: grpc-service
template:
metadata:
labels:
app: grpc-service
spec:
containers:
- name: grpc-service
image: grpc-service:latest
ports:
- containerPort: 50051
name: grpc
protocol: TCP
env:
- name: PORT
value: "50051"
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:50051"]
initialDelaySeconds: 10
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:50051"]
initialDelaySeconds: 5
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: grpc-service
spec:
selector:
app: grpc-service
ports:
- port: 50051
targetPort: 50051
protocol: TCP
name: grpc
type: ClusterIPapiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: grpc-service
spec:
hosts:
- grpc-service
http:
- match:
- headers:
version:
exact: v2
route:
- destination:
host: grpc-service
subset: v2
- route:
- destination:
host: grpc-service
subset: v1