Migrating from Auth0 / Cognito
Step-by-step migration from Auth0, Amazon Cognito, or custom JWT middleware to CoreSDK with zero downtime.
Phase note. Migration guides for Go and TypeScript SDKs apply to Phase 2. Python migration is available now.
Migrating from Auth0 / Cognito
This guide walks through migrating to CoreSDK from three common starting points: Auth0, Amazon Cognito, and hand-rolled JWT middleware. Each section shows before/after code and ends with a zero-downtime cut-over strategy.
Migrating from Auth0
Before
A typical Auth0 integration uses express-oauth2-jwt-bearer or a manual jsonwebtoken verification step:
// Before: Auth0 with express-oauth2-jwt-bearer
import { auth } from "express-oauth2-jwt-bearer";
const checkJwt = auth({
audience: "https://api.acme.com",
issuerBaseURL: "https://acme.us.auth0.com/",
});
app.get("/documents", checkJwt, async (req, res) => {
const userId = req.auth?.payload.sub;
// req.auth.payload contains raw claims — role lives at a custom namespace
const role = req.auth?.payload["https://acme.com/role"] ?? "viewer";
const docs = await getDocuments(userId);
res.json(docs);
});After
// After: CoreSDK (Rust / Axum)
use coresdk_engine::{Engine, EngineConfig};
use axum::{middleware, Router};
let engine = Engine::from_env()?;
async fn get_documents(
Extension(user): Extension<VerifiedUser>,
State(state): State<AppState>,
) -> impl IntoResponse {
let docs = fetch_documents(&user.id).await.unwrap();
Json(docs)
}# After: CoreSDK (Python / FastAPI)
import os
from coresdk import CoreSDKClient, SDKConfig
from coresdk.middleware.fastapi import CoreSDKMiddleware
from fastapi import Depends, FastAPI, Header, HTTPException
sdk = CoreSDKClient(SDKConfig.from_env())
async def get_current_user(
authorization: str = Header(..., alias="Authorization"),
):
token = authorization.removeprefix("Bearer ")
decision = sdk.validate_token(token)
if not decision.allowed:
raise HTTPException(status_code=401, detail=decision.reason)
return decision
@app.get("/documents")
async def list_documents(decision=Depends(get_current_user)):
docs = await get_documents(decision.subject)
return docs// After: CoreSDK (Go)
sdk, err := coresdk.NewSDK(coresdk.Config{
Tenant: "acme",
JWKSUrl: "https://acme.us.auth0.com/.well-known/jwks.json",
JWTAudience: "https://api.acme.com",
JWTIssuer: "https://acme.us.auth0.com/",
ClaimsMapper: func(claims map[string]any) coresdk.UserClaims {
role, _ := claims["https://acme.com/role"].(string)
if role == "" {
role = "viewer"
}
return coresdk.UserClaims{
ID: claims["sub"].(string),
Email: claims["email"].(string),
Role: role, // mapped from Auth0 custom namespace
}
},
})
http.HandleFunc("/documents", func(w http.ResponseWriter, r *http.Request) {
token := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")
result, err := sdk.VerifyToken(r.Context(), token)
if err != nil || !result.OK {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
docs, _ := getDocuments(result.User.ID)
json.NewEncoder(w).Encode(docs)
})// After: CoreSDK (TypeScript / Node.js)
import { CoreSDK } from "@coresdk/sdk";
const sdk = new CoreSDK({ tenant: "acme-corp" }) // Phase 2 planned API
.tenant("acme")
.jwksUrl("https://acme.us.auth0.com/.well-known/jwks.json") // same JWKS endpoint
.jwtAudience("https://api.acme.com")
.jwtIssuer("https://acme.us.auth0.com/")
.claimsMapper((claims) => ({
id: claims.sub,
email: claims.email,
role: claims["https://acme.com/role"] ?? "viewer", // map Auth0 custom namespace
}))
.build();
app.get("/documents", async (req, res) => {
const result = await sdk.verifyToken(req.headers.authorization?.slice(7) ?? "");
if (!result.ok) return res.status(result.error.status).json(result.error);
const docs = await getDocuments(result.user.id);
res.json(docs);
});What changes
| Auth0 | CoreSDK |
|---|---|
req.auth.payload.sub | result.user.id |
Custom namespace claim https://acme.com/role | Mapped to result.user.role via claimsMapper |
express-oauth2-jwt-bearer middleware | sdk.verifyToken() call |
| Auth0 Management API for user data | CoreSDK user context from token |
Migrating from Amazon Cognito
Before
Cognito verification typically uses aws-jwt-verify or the Cognito JWKS URL directly:
// Before: Cognito with aws-jwt-verify
import { CognitoJwtVerifier } from "aws-jwt-verify";
const verifier = CognitoJwtVerifier.create({
userPoolId: "us-east-1_ABC123",
tokenUse: "access",
clientId: "your-client-id",
});
app.use(async (req, res, next) => {
try {
const payload = await verifier.verify(
req.headers.authorization?.split(" ")[1] ?? ""
);
req.user = {
id: payload.sub,
groups: payload["cognito:groups"] ?? [],
};
next();
} catch {
res.status(401).json({ error: "Unauthorized" });
}
});After
// After: CoreSDK (Rust)
use coresdk_engine::{Engine, EngineConfig};
let engine = Engine::from_env()?;# After: CoreSDK (Python)
from coresdk import CoreSDKClient, SDKConfig
sdk = CoreSDKClient(SDKConfig.from_env())
async def auth_middleware(request: Request, call_next):
token = request.headers.get("Authorization", "").removeprefix("Bearer ")
decision = sdk.validate_token(token)
if not decision.allowed:
return JSONResponse(
status_code=401,
content={"error": decision.reason},
)
request.state.user = decision
return await call_next(request)// After: CoreSDK (Go)
import coresdk "github.com/your-org/coresdk-go"
userPoolID := "us-east-1_ABC123"
region := "us-east-1"
sdk, err := coresdk.NewSDK(coresdk.Config{
Tenant: "acme",
// Cognito JWKS URL follows a predictable pattern
JWKSUrl: fmt.Sprintf("https://cognito-idp.%s.amazonaws.com/%s/.well-known/jwks.json", region, userPoolID),
JWTIssuer: fmt.Sprintf("https://cognito-idp.%s.amazonaws.com/%s", region, userPoolID),
ClaimsMapper: func(claims map[string]any) coresdk.UserClaims {
role := "viewer"
if groups, ok := claims["cognito:groups"].([]any); ok {
for _, g := range groups {
if g == "admins" {
role = "admin"
break
}
}
}
return coresdk.UserClaims{
ID: claims["sub"].(string),
Email: claims["email"].(string),
Role: role,
}
},
})
func AuthMiddleware(sdk *coresdk.SDK) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")
result, err := sdk.VerifyToken(r.Context(), token)
if err != nil || !result.OK {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
ctx := context.WithValue(r.Context(), userKey, result.User)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}// After: CoreSDK (TypeScript / Node.js)
import { CoreSDK } from "@coresdk/sdk";
const userPoolId = "us-east-1_ABC123";
const region = "us-east-1";
const sdk = new CoreSDK({ tenant: "acme-corp" }) // Phase 2 planned API
.tenant("acme")
// Cognito JWKS URL follows a predictable pattern
.jwksUrl(`https://cognito-idp.${region}.amazonaws.com/${userPoolId}/.well-known/jwks.json`)
.jwtIssuer(`https://cognito-idp.${region}.amazonaws.com/${userPoolId}`)
.claimsMapper((claims) => ({
id: claims.sub,
email: claims.email,
role: (claims["cognito:groups"] as string[] ?? []).includes("admins")
? "admin"
: "viewer",
}))
.build();
app.use(async (req, res, next) => {
const token = req.headers.authorization?.slice(7) ?? "";
const result = await sdk.verifyToken(token);
if (!result.ok) {
return res.status(result.error.status).json({
error: result.error.title,
trace_id: result.error.traceId,
});
}
req.user = result.user;
next();
});What changes
| Cognito | CoreSDK |
|---|---|
CognitoJwtVerifier | sdk.verifyToken() |
payload["cognito:groups"] | Mapped to user.role via claimsMapper |
| Library-specific error types | Structured error.status / error.title |
| Per-client-ID config | Single SDK instance, multi-tenant aware |
Migrating from custom JWT middleware
Before
Many teams write their own verification using jsonwebtoken (Node.js) or PyJWT (Python):
// Before: custom Node.js JWT middleware
import jwt from "jsonwebtoken";
import jwksRsa from "jwks-rsa";
const client = jwksRsa({ jwksUri: "https://your-idp.com/.well-known/jwks.json" });
function getKey(header, callback) {
client.getSigningKey(header.kid, (err, key) => {
callback(err, key?.getPublicKey());
});
}
app.use((req, res, next) => {
const token = req.headers.authorization?.split(" ")[1];
if (!token) return res.status(401).json({ error: "No token" });
jwt.verify(token, getKey, { algorithms: ["RS256"] }, (err, decoded) => {
if (err) return res.status(401).json({ error: err.message });
req.user = decoded;
next();
});
});# Before: custom Python JWT middleware (FastAPI)
import jwt
from fastapi import Request, HTTPException
JWKS_URI = "https://your-idp.com/.well-known/jwks.json"
async def verify_token(request: Request):
token = request.headers.get("Authorization", "").removeprefix("Bearer ")
try:
# Manual JWKS fetch, key selection, and verification
keys = await fetch_jwks(JWKS_URI)
header = jwt.get_unverified_header(token)
key = next(k for k in keys if k["kid"] == header["kid"])
payload = jwt.decode(token, key, algorithms=["RS256"])
return payload
except Exception as e:
raise HTTPException(status_code=401, detail=str(e))After
// After: CoreSDK (Rust / Axum)
use coresdk_engine::{Engine, EngineConfig};
use axum::{
extract::State,
http::{Request, StatusCode},
middleware::Next,
response::Response,
Extension,
};
let engine = Engine::from_env()?;
async fn auth_middleware(
State(sdk): State<CoreSDK>,
mut req: Request<Body>,
next: Next<Body>,
) -> Result<Response, StatusCode> {
let token = req
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.ok_or(StatusCode::UNAUTHORIZED)?;
let result = sdk.verify_token(token).await
.map_err(|_| StatusCode::UNAUTHORIZED)?;
req.extensions_mut().insert(result.user);
Ok(next.run(req).await)
}# After: CoreSDK (Python / FastAPI)
from coresdk import CoreSDKClient, SDKConfig
from fastapi import Depends, HTTPException, Header
sdk = CoreSDKClient(SDKConfig.from_env())
async def get_current_user(
authorization: str = Header(..., alias="Authorization"),
):
token = authorization.removeprefix("Bearer ")
decision = sdk.validate_token(token)
if not decision.allowed:
raise HTTPException(status_code=401, detail=decision.reason)
return decision// After: CoreSDK (Go)
import coresdk "github.com/your-org/coresdk-go"
sdk, err := coresdk.NewSDK(coresdk.Config{
Tenant: "acme",
JWKSUrl: "https://your-idp.com/.well-known/jwks.json",
})
func AuthMiddleware(sdk *coresdk.SDK) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")
if token == "" {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
result, err := sdk.VerifyToken(r.Context(), token)
if err != nil || !result.OK {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(w).Encode(map[string]string{
"type": result.Error.Type,
"title": result.Error.Title,
"trace_id": result.Error.TraceID,
})
return
}
ctx := context.WithValue(r.Context(), userKey, result.User)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}// After: CoreSDK (TypeScript / Node.js)
import { CoreSDK } from "@coresdk/sdk";
const sdk = new CoreSDK({ tenant: "acme-corp" }) // Phase 2 planned API
.tenant("acme")
.jwksUrl("https://your-idp.com/.well-known/jwks.json")
.build();
app.use(async (req, res, next) => {
const token = req.headers.authorization?.slice(7) ?? "";
const result = await sdk.verifyToken(token);
if (!result.ok) {
return res.status(result.error.status).json({
type: result.error.type,
title: result.error.title,
trace_id: result.error.traceId,
});
}
req.user = result.user;
next();
});What changes
| Custom middleware | CoreSDK |
|---|---|
| Manual JWKS fetch and cache | Automatic, with Cache-Control-aware refresh |
Manual key selection by kid | Handled internally |
jwt.verify / jwt.decode | sdk.verifyToken() |
| Hand-rolled error responses | Structured RFC 7807 error objects |
| No policy layer | sdk.policy.check() available immediately |
Zero-downtime migration strategy
Running two verification paths in parallel lets you migrate without a flag day or scheduled downtime.
Step 1 — Deploy CoreSDK alongside existing auth
// Dual-verify middleware — accepts tokens from both old and new paths
app.use(async (req, res, next) => {
const token = req.headers.authorization?.slice(7) ?? "";
if (!token) return res.status(401).json({ error: "Unauthorized" });
// Try CoreSDK first
const result = await sdk.verifyToken(token);
if (result.ok) {
req.user = result.user;
return next();
}
// Fall back to legacy verifier while migration is in progress
try {
const legacyUser = await legacyVerify(token);
req.user = legacyUser;
metrics.increment("auth.legacy_fallback");
return next();
} catch {
return res.status(401).json({ error: "Unauthorized" });
}
});Step 2 — Monitor the fallback counter
Watch auth.legacy_fallback in your metrics dashboard. As clients rotate to tokens issued by your new identity provider, the counter trends toward zero.
Step 3 — Cut over
Once the fallback counter reaches zero for your desired observation window (typically 24–48 hours), remove the legacy path:
// Clean cut-over — legacy code removed
app.use(async (req, res, next) => {
const token = req.headers.authorization?.slice(7) ?? "";
const result = await sdk.verifyToken(token);
if (!result.ok) {
return res.status(result.error.status).json(result.error);
}
req.user = result.user;
next();
});Step 4 — Clean up
- Remove the legacy JWT library dependency.
- Delete the old middleware file.
- Remove feature flags controlling the dual-verify path.
- Close any migration tracking issues.
gRPC Services
Use CoreSDK interceptors to authenticate unary and streaming gRPC calls and propagate user context via metadata.
Runtime Coverage & Dead Code Detection
Use CoreSDK's trace-based coverage mode to find functions that are never called during integration testing — across any language, without static analysis.