Skip to main content
CoreSDK
Guides

gRPC Services

Use CoreSDK interceptors to authenticate unary and streaming gRPC calls and propagate user context via metadata.

gRPC Services

CoreSDK provides gRPC interceptors for server-side unary and streaming calls. The interceptor extracts the bearer token from gRPC metadata, verifies it, and attaches the resolved VerifiedUser to the request context so your service handlers can access it without repeating auth logic.

How it works

gRPC clients send credentials in the authorization metadata key (lowercase, matching HTTP/2 header conventions). The server interceptor:

  1. Reads authorization from incoming metadata.
  2. Calls sdk.verify_token(token).
  3. On success, injects the user into the RPC context.
  4. On failure, returns UNAUTHENTICATED or PERMISSION_DENIED status.

Rust

// src/grpc_server.rs
use coresdk::{CoreSDK, VerifiedUser};
use tonic::{
    metadata::MetadataValue,
    service::Interceptor,
    Request, Status,
};

#[derive(Clone)]
pub struct AuthInterceptor {
    sdk: CoreSDK,
}

impl AuthInterceptor {
    pub fn new(sdk: CoreSDK) -> Self {
        Self { sdk }
    }
}

impl Interceptor for AuthInterceptor {
    fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
        let token = req
            .metadata()
            .get("authorization")
            .and_then(|v| v.to_str().ok())
            .and_then(|v| v.strip_prefix("Bearer "))
            .ok_or_else(|| Status::unauthenticated("Missing bearer token"))?;

        // Block on the async verification — use tokio::task::block_in_place
        // if you need to call this from a sync Interceptor context.
        let user = tokio::task::block_in_place(|| {
            tokio::runtime::Handle::current()
                .block_on(self.sdk.verify_token(token))
        })
        .map_err(|e| Status::unauthenticated(e.to_string()))?;

        req.extensions_mut().insert(user);
        Ok(req)
    }
}

// Retrieve the user in a handler:
pub async fn get_document(
    &self,
    req: Request<GetDocumentRequest>,
) -> Result<Response<Document>, Status> {
    let user = req
        .extensions()
        .get::<VerifiedUser>()
        .ok_or_else(|| Status::internal("User not injected by interceptor"))?;

    // Policy check
    self.sdk.policy.check(PolicyInput {
        principal: user.into(),
        action: "documents:read",
        resource: ResourceRef { r#type: "document" },
    })
    .await
    .map_err(|_| Status::permission_denied("Access denied"))?;

    // ... fetch and return document
}

Python

# interceptor.py
import grpc
from coresdk import CoreSDK, VerifiedUser

USER_CONTEXT_KEY = grpc.experimental.aio.ServicerContext

class CoreSDKInterceptor(grpc.aio.ServerInterceptor):
    def __init__(self, sdk: CoreSDK):
        self.sdk = sdk

    async def intercept_service_async(self, continuation, handler_call_details):
        metadata = dict(handler_call_details.invocation_metadata)
        auth_header = metadata.get("authorization", "")

        if not auth_header.startswith("Bearer "):
            async def abort(request, context):
                await context.abort(grpc.StatusCode.UNAUTHENTICATED, "Missing bearer token")
            return grpc.unary_unary_rpc_method_handler(abort)

        token = auth_header.removeprefix("Bearer ")
        result = await self.sdk.verify_token(token)

        if not result.ok:
            async def abort(request, context):
                await context.abort(grpc.StatusCode.UNAUTHENTICATED, result.error.title)
            return grpc.unary_unary_rpc_method_handler(abort)

        # Attach user to context via trailing metadata (accessible in handlers)
        handler = await continuation(handler_call_details)
        return _wrap_handler_with_user(handler, result.user)


# main.py — wire the interceptor
import asyncio
import grpc
from grpc import aio
from coresdk import CoreSDKClient, SDKConfig

async def serve():
    sdk = CoreSDKClient(SDKConfig.from_env())

    server = aio.server(interceptors=[CoreSDKInterceptor(sdk)])
    add_DocumentServiceServicer_to_server(DocumentService(sdk), server)
    server.add_insecure_port("[::]:50051")
    await server.start()
    await server.wait_for_termination()

asyncio.run(serve())

Go

// interceptor.go
package auth

import (
    "context"
    "strings"

    coresdk "github.com/coresdk-dev/sdk-go"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
)

type contextKey string

const userContextKey contextKey = "coresdk_user"

func UnaryAuthInterceptor(sdk *coresdk.SDK) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        user, err := extractAndVerify(ctx, sdk)
        if err != nil {
            return nil, err
        }
        ctx = context.WithValue(ctx, userContextKey, user)
        return handler(ctx, req)
    }
}

func StreamAuthInterceptor(sdk *coresdk.SDK) grpc.StreamServerInterceptor {
    return func(
        srv interface{},
        ss grpc.ServerStream,
        info *grpc.StreamServerInfo,
        handler grpc.StreamHandler,
    ) error {
        user, err := extractAndVerify(ss.Context(), sdk)
        if err != nil {
            return err
        }
        wrapped := &wrappedStream{ss, context.WithValue(ss.Context(), userContextKey, user)}
        return handler(srv, wrapped)
    }
}

func extractAndVerify(ctx context.Context, sdk *coresdk.SDK) (*coresdk.VerifiedUser, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, status.Error(codes.Unauthenticated, "missing metadata")
    }

    vals := md.Get("authorization")
    if len(vals) == 0 || !strings.HasPrefix(vals[0], "Bearer ") {
        return nil, status.Error(codes.Unauthenticated, "missing bearer token")
    }

    token := strings.TrimPrefix(vals[0], "Bearer ")
    result, err := sdk.VerifyToken(ctx, token)
    if err != nil || !result.OK {
        return nil, status.Error(codes.Unauthenticated, "invalid token")
    }

    return result.User, nil
}

// UserFromContext retrieves the verified user in a handler.
func UserFromContext(ctx context.Context) (*coresdk.VerifiedUser, bool) {
    u, ok := ctx.Value(userContextKey).(*coresdk.VerifiedUser)
    return u, ok
}

// wrappedStream carries an enriched context through streaming calls.
type wrappedStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (w *wrappedStream) Context() context.Context { return w.ctx }
// server.go — register interceptors
func main() {
    sdk, _ := coresdk.NewSDK(coresdk.Config{
        Tenant:  "acme",
        JWKSUrl: "https://your-idp.com/.well-known/jwks.json",
    })

    grpcServer := grpc.NewServer(
        grpc.UnaryInterceptor(auth.UnaryAuthInterceptor(sdk)),
        grpc.StreamInterceptor(auth.StreamAuthInterceptor(sdk)),
    )

    pb.RegisterDocumentServiceServer(grpcServer, &DocumentService{sdk: sdk})
    lis, _ := net.Listen("tcp", ":50051")
    grpcServer.Serve(lis)
}

TypeScript

// interceptor.ts
import {
  ServerInterceptingCall,
  ServerInterceptor,
  ServerListener,
  InterceptingCall,
  Metadata,
  status,
} from "@grpc/grpc-js";
import { sdk } from "./sdk";

export const authInterceptor: ServerInterceptor = (methodDescriptor, call) => {
  return new ServerInterceptingCall(call, {
    start(metadata, listener, next) {
      const authHeader =
        metadata.get("authorization")[0]?.toString() ?? "";

      if (!authHeader.startsWith("Bearer ")) {
        call.sendStatus({
          code: status.UNAUTHENTICATED,
          details: "Missing bearer token",
          metadata: new Metadata(),
        });
        return;
      }

      const token = authHeader.slice(7);

      sdk.verifyToken(token).then((result) => {
        if (!result.ok) {
          call.sendStatus({
            code: status.UNAUTHENTICATED,
            details: result.error.title,
            metadata: new Metadata(),
          });
          return;
        }

        // Attach user to metadata so handlers can read it
        metadata.set("x-user-id", result.user.id);
        metadata.set("x-user-role", result.user.role ?? "");
        next(metadata, listener);
      });
    },
  });
};
// server.ts
import * as grpc from "@grpc/grpc-js";
import { authInterceptor } from "./interceptor";
import { DocumentServiceImpl } from "./services/document";

const server = new grpc.Server({
  interceptors: [authInterceptor],
});

server.addService(DocumentServiceService, new DocumentServiceImpl());
server.bindAsync("0.0.0.0:50051", grpc.ServerCredentials.createInsecure(), () => {
  server.start();
});

Policy checking on RPC methods

After the interceptor attaches the user to the request context, handlers call sdk.policy.check before operating on data. This keeps policy logic centralized and testable independently of transport concerns.

// Go handler example
func (s *DocumentService) GetDocument(ctx context.Context, req *pb.GetDocumentRequest) (*pb.Document, error) {
    user, ok := auth.UserFromContext(ctx)
    if !ok {
        return nil, status.Error(codes.Internal, "user not in context")
    }

    allowed, err := s.sdk.Policy.Check(ctx, coresdk.PolicyInput{
        Principal: coresdk.Principal{ID: user.ID, Role: user.Role},
        Action:    "documents:read",
        Resource:  coresdk.ResourceRef{Type: "document", ID: req.DocumentId},
    })
    if err != nil || !allowed {
        return nil, status.Error(codes.PermissionDenied, "access denied")
    }

    return s.store.GetDocument(ctx, req.DocumentId)
}

Summary

LanguageInterceptor typeUser propagation
Rusttonic::service::InterceptorRequest::extensions_mut()
Pythongrpc.aio.ServerInterceptorWrapped handler context
Gogrpc.UnaryServerInterceptor / StreamServerInterceptorcontext.WithValue
TypeScriptgrpc.ServerInterceptorMetadata headers

On this page