RBAC & Roles
Define roles, assign them to users, and enforce role-based access in CoreSDK middleware.
RBAC & Roles
CoreSDK's RBAC model gives every user a role embedded in their JWT claims, and every route a required role check. Roles are evaluated via the Rego policy engine; middleware enforces access before your handler runs.
How roles work
Roles are carried in JWT claims and passed to the policy engine as context.roles in the PolicyInput. Your Rego policy (data.authz.allow) receives the full input shape and decides whether to allow the request.
The CoreSDKMiddleware (FastAPI) and equivalent middleware for other frameworks validate the token, extract claims, and make them available to your handler.
Setting up middleware
use coresdk_engine::{Engine, EngineConfig, error::ProblemDetail};
use axum::{Router, middleware};
let engine = Engine::from_env()?;
let app = Router::new()
.route("/api/orders", axum::routing::get(list_orders))
.route_layer(middleware::from_fn_with_state(
engine.clone(),
coresdk_engine::middleware::auth_middleware,
));from coresdk import CoreSDKClient, SDKConfig
from coresdk.middleware.fastapi import CoreSDKMiddleware
from fastapi import FastAPI
_sdk = CoreSDKClient(SDKConfig(
sidecar_addr="http://127.0.0.1:7233",
tenant_id="acme",
service_name="orders-api",
))
app = FastAPI()
app.add_middleware(CoreSDKMiddleware, sdk=_sdk)Accessing the authenticated user
After the middleware runs, claims are available on the request state.
use coresdk_engine::{Engine, auth::decision::AuthRequest};
async fn list_orders(
axum::extract::State(engine): axum::extract::State<Engine>,
headers: axum::http::HeaderMap,
) -> impl axum::response::IntoResponse {
let token = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.unwrap_or("");
let decision = engine.auth().authorize(AuthRequest {
token: token.to_string(),
..Default::default()
})?;
if !decision.allowed {
return Err(ProblemDetail::unauthorized("missing or invalid token"));
}
// decision.claims contains the JWT claims
Ok(axum::Json(serde_json::json!({ "user": decision.claims })))
}from fastapi import Request
@app.get("/api/orders")
async def list_orders(request: Request):
user = request.state.coresdk_user # set by CoreSDKMiddleware
return {"user_id": user["sub"], "roles": user.get("roles", [])}Enforcing roles with a dependency
Use a FastAPI dependency to require a specific role before a handler runs. When the user's role does not match, CoreSDK raises a ProblemDetailError (HTTP 403) before the handler executes.
from fastapi import Depends, HTTPException, Request
from coresdk.errors._rfc9457 import ProblemDetailError
def require_role(*allowed_roles: str):
async def _check(request: Request):
user = getattr(request.state, "coresdk_user", None)
if user is None:
raise ProblemDetailError(
status=401,
title="Unauthorized",
detail="No authenticated user found.",
type="https://coresdk.dev/errors/unauthorized",
)
roles = user.get("roles", [])
if not any(r in roles for r in allowed_roles):
raise ProblemDetailError(
status=403,
title="Forbidden",
detail=f"Role must be one of: {', '.join(allowed_roles)}",
type="https://coresdk.dev/errors/forbidden",
)
return user
return _check
@app.get("/api/orders")
async def list_orders(user=Depends(require_role("member", "admin"))):
return await db.orders.for_user(user["sub"])
@app.post("/api/orders")
async def create_order(payload: OrderIn, user=Depends(require_role("member", "admin"))):
return await db.orders.create(user["sub"], payload)
@app.get("/admin/users")
async def list_users(user=Depends(require_role("admin"))):
return await db.users.all()When a user's role does not permit the action, CoreSDK returns a 403 before the handler runs:
{
"type": "https://coresdk.dev/errors/forbidden",
"title": "Forbidden",
"status": 403,
"detail": "Role must be one of: admin",
"trace_id": "01HX7KQMB4NWE9P6T2JS0RY3ZV"
}Evaluating roles via the policy engine
For fine-grained checks, call evaluate_policy directly. The context.roles field carries the user's roles into your Rego rule.
use coresdk_engine::{Engine, policy::decision::PolicyInput};
let allowed = tokio::task::spawn_blocking({
let engine = engine.clone();
move || engine.policy().evaluate("data.authz.allow", PolicyInput {
tenant_id: "acme".to_string(),
subject: "usr_2xK9".to_string(),
action: "orders:read".to_string(),
resource: "ord_7mP3".to_string(),
resource_owner: Some("usr_2xK9".to_string()),
context: serde_json::json!({ "roles": ["member"] }),
})
}).await??;
if !allowed {
return Err(ProblemDetail::forbidden("insufficient role"));
}allowed = _sdk.evaluate_policy("data.authz.allow", {
"tenant_id": "acme",
"subject": "usr_2xK9",
"action": "orders:read",
"resource": "ord_7mP3",
"resource_owner": "usr_2xK9",
"context": {"roles": ["member"]},
})
if not allowed:
raise ProblemDetailError(
status=403,
title="Forbidden",
detail="insufficient role",
type="https://coresdk.dev/errors/forbidden",
)Go and TypeScript
Phase 2. Go and TypeScript SDKs ship in Phase 2. The API shown is the planned surface.
// Go (Phase 2)
import sdk "github.com/coresdk-dev/sdk-go"
s := sdk.New(sdk.Config{
SidecarAddr: "http://127.0.0.1:7233",
TenantID: "acme",
ServiceName: "orders-api",
})
mux.Handle("/api/orders", s.RequireRole("member", "admin")(listOrdersHandler))// TypeScript (Phase 2)
import { CoreSDKClient } from "@coresdk/node";
const sdk = new CoreSDKClient({
sidecarAddr: "http://127.0.0.1:7233",
tenantId: "acme",
serviceName: "orders-api",
});
app.get("/api/orders", sdk.requireRole("member", "admin"), listOrders);Next steps
- JWT Authentication — how roles are extracted from token claims
- Writing policies — combine RBAC with attribute-based policy for fine-grained control
- Testing Policies — unit test role rules before deploying