gRPC Services
Use CoreSDK interceptors to authenticate unary and streaming gRPC calls and propagate user context via metadata.
gRPC Services
CoreSDK provides gRPC interceptors for server-side unary and streaming calls. The interceptor extracts the bearer token from gRPC metadata, verifies it, and attaches the resolved VerifiedUser to the request context so your service handlers can access it without repeating auth logic.
How it works
gRPC clients send credentials in the authorization metadata key (lowercase, matching HTTP/2 header conventions). The server interceptor:
- Reads
authorizationfrom incoming metadata. - Calls
sdk.verify_token(token). - On success, injects the user into the RPC context.
- On failure, returns
UNAUTHENTICATEDorPERMISSION_DENIEDstatus.
Rust
// src/grpc_server.rs
use coresdk::{CoreSDK, VerifiedUser};
use tonic::{
metadata::MetadataValue,
service::Interceptor,
Request, Status,
};
#[derive(Clone)]
pub struct AuthInterceptor {
sdk: CoreSDK,
}
impl AuthInterceptor {
pub fn new(sdk: CoreSDK) -> Self {
Self { sdk }
}
}
impl Interceptor for AuthInterceptor {
fn call(&mut self, mut req: Request<()>) -> Result<Request<()>, Status> {
let token = req
.metadata()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.ok_or_else(|| Status::unauthenticated("Missing bearer token"))?;
// Block on the async verification — use tokio::task::block_in_place
// if you need to call this from a sync Interceptor context.
let user = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(self.sdk.verify_token(token))
})
.map_err(|e| Status::unauthenticated(e.to_string()))?;
req.extensions_mut().insert(user);
Ok(req)
}
}
// Retrieve the user in a handler:
pub async fn get_document(
&self,
req: Request<GetDocumentRequest>,
) -> Result<Response<Document>, Status> {
let user = req
.extensions()
.get::<VerifiedUser>()
.ok_or_else(|| Status::internal("User not injected by interceptor"))?;
// Policy check
self.sdk.policy.check(PolicyInput {
principal: user.into(),
action: "documents:read",
resource: ResourceRef { r#type: "document" },
})
.await
.map_err(|_| Status::permission_denied("Access denied"))?;
// ... fetch and return document
}Python
# interceptor.py
import grpc
from coresdk import CoreSDK, VerifiedUser
USER_CONTEXT_KEY = grpc.experimental.aio.ServicerContext
class CoreSDKInterceptor(grpc.aio.ServerInterceptor):
def __init__(self, sdk: CoreSDK):
self.sdk = sdk
async def intercept_service_async(self, continuation, handler_call_details):
metadata = dict(handler_call_details.invocation_metadata)
auth_header = metadata.get("authorization", "")
if not auth_header.startswith("Bearer "):
async def abort(request, context):
await context.abort(grpc.StatusCode.UNAUTHENTICATED, "Missing bearer token")
return grpc.unary_unary_rpc_method_handler(abort)
token = auth_header.removeprefix("Bearer ")
result = await self.sdk.verify_token(token)
if not result.ok:
async def abort(request, context):
await context.abort(grpc.StatusCode.UNAUTHENTICATED, result.error.title)
return grpc.unary_unary_rpc_method_handler(abort)
# Attach user to context via trailing metadata (accessible in handlers)
handler = await continuation(handler_call_details)
return _wrap_handler_with_user(handler, result.user)
# main.py — wire the interceptor
import asyncio
import grpc
from grpc import aio
from coresdk import CoreSDKClient, SDKConfig
async def serve():
sdk = CoreSDKClient(SDKConfig.from_env())
server = aio.server(interceptors=[CoreSDKInterceptor(sdk)])
add_DocumentServiceServicer_to_server(DocumentService(sdk), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
asyncio.run(serve())Go
// interceptor.go
package auth
import (
"context"
"strings"
coresdk "github.com/coresdk-dev/sdk-go"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
type contextKey string
const userContextKey contextKey = "coresdk_user"
func UnaryAuthInterceptor(sdk *coresdk.SDK) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
user, err := extractAndVerify(ctx, sdk)
if err != nil {
return nil, err
}
ctx = context.WithValue(ctx, userContextKey, user)
return handler(ctx, req)
}
}
func StreamAuthInterceptor(sdk *coresdk.SDK) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
user, err := extractAndVerify(ss.Context(), sdk)
if err != nil {
return err
}
wrapped := &wrappedStream{ss, context.WithValue(ss.Context(), userContextKey, user)}
return handler(srv, wrapped)
}
}
func extractAndVerify(ctx context.Context, sdk *coresdk.SDK) (*coresdk.VerifiedUser, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
vals := md.Get("authorization")
if len(vals) == 0 || !strings.HasPrefix(vals[0], "Bearer ") {
return nil, status.Error(codes.Unauthenticated, "missing bearer token")
}
token := strings.TrimPrefix(vals[0], "Bearer ")
result, err := sdk.VerifyToken(ctx, token)
if err != nil || !result.OK {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
return result.User, nil
}
// UserFromContext retrieves the verified user in a handler.
func UserFromContext(ctx context.Context) (*coresdk.VerifiedUser, bool) {
u, ok := ctx.Value(userContextKey).(*coresdk.VerifiedUser)
return u, ok
}
// wrappedStream carries an enriched context through streaming calls.
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context { return w.ctx }// server.go — register interceptors
func main() {
sdk, _ := coresdk.NewSDK(coresdk.Config{
Tenant: "acme",
JWKSUrl: "https://your-idp.com/.well-known/jwks.json",
})
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(auth.UnaryAuthInterceptor(sdk)),
grpc.StreamInterceptor(auth.StreamAuthInterceptor(sdk)),
)
pb.RegisterDocumentServiceServer(grpcServer, &DocumentService{sdk: sdk})
lis, _ := net.Listen("tcp", ":50051")
grpcServer.Serve(lis)
}TypeScript
// interceptor.ts
import {
ServerInterceptingCall,
ServerInterceptor,
ServerListener,
InterceptingCall,
Metadata,
status,
} from "@grpc/grpc-js";
import { sdk } from "./sdk";
export const authInterceptor: ServerInterceptor = (methodDescriptor, call) => {
return new ServerInterceptingCall(call, {
start(metadata, listener, next) {
const authHeader =
metadata.get("authorization")[0]?.toString() ?? "";
if (!authHeader.startsWith("Bearer ")) {
call.sendStatus({
code: status.UNAUTHENTICATED,
details: "Missing bearer token",
metadata: new Metadata(),
});
return;
}
const token = authHeader.slice(7);
sdk.verifyToken(token).then((result) => {
if (!result.ok) {
call.sendStatus({
code: status.UNAUTHENTICATED,
details: result.error.title,
metadata: new Metadata(),
});
return;
}
// Attach user to metadata so handlers can read it
metadata.set("x-user-id", result.user.id);
metadata.set("x-user-role", result.user.role ?? "");
next(metadata, listener);
});
},
});
};// server.ts
import * as grpc from "@grpc/grpc-js";
import { authInterceptor } from "./interceptor";
import { DocumentServiceImpl } from "./services/document";
const server = new grpc.Server({
interceptors: [authInterceptor],
});
server.addService(DocumentServiceService, new DocumentServiceImpl());
server.bindAsync("0.0.0.0:50051", grpc.ServerCredentials.createInsecure(), () => {
server.start();
});Policy checking on RPC methods
After the interceptor attaches the user to the request context, handlers call sdk.policy.check before operating on data. This keeps policy logic centralized and testable independently of transport concerns.
// Go handler example
func (s *DocumentService) GetDocument(ctx context.Context, req *pb.GetDocumentRequest) (*pb.Document, error) {
user, ok := auth.UserFromContext(ctx)
if !ok {
return nil, status.Error(codes.Internal, "user not in context")
}
allowed, err := s.sdk.Policy.Check(ctx, coresdk.PolicyInput{
Principal: coresdk.Principal{ID: user.ID, Role: user.Role},
Action: "documents:read",
Resource: coresdk.ResourceRef{Type: "document", ID: req.DocumentId},
})
if err != nil || !allowed {
return nil, status.Error(codes.PermissionDenied, "access denied")
}
return s.store.GetDocument(ctx, req.DocumentId)
}Summary
| Language | Interceptor type | User propagation |
|---|---|---|
| Rust | tonic::service::Interceptor | Request::extensions_mut() |
| Python | grpc.aio.ServerInterceptor | Wrapped handler context |
| Go | grpc.UnaryServerInterceptor / StreamServerInterceptor | context.WithValue |
| TypeScript | grpc.ServerInterceptor | Metadata headers |
AI Agent Tool Gateway
Build AgentGate — a secure gateway that lets LLM agents call internal tools (databases, APIs, shell commands) with per-tool policy enforcement, rate limiting, and a full audit trail of every action taken. TypeScript + Express.
Migrating from Auth0 / Cognito
Step-by-step migration from Auth0, Amazon Cognito, or custom JWT middleware to CoreSDK with zero downtime.