CloudEvents Envelope
Wrap CoreSDK OTel log events in CloudEvents 1.0 metadata for interoperability with Kafka, EventBridge, Knative Eventing, and Azure Event Grid.
Phase note. Ships Phase 2.
CloudEvents Envelope
CoreSDK can wrap its OTel log events in a CloudEvents 1.0 metadata envelope before export. This makes the events consumable by event-driven systems — Kafka, EventBridge, Knative Eventing, Azure Event Grid — without any transformation layer on the consumer side.
Traces and metrics continue to export over OTLP in standard format. CloudEvents wrapping applies to log events only.
Rust API
CloudEvent struct
use coresdk_cloudevents::{CloudEvent, CloudEventEmitter};
use opentelemetry::trace::Span;
// Build a CloudEvent directly
let event = CloudEvent {
specversion: "1.0".to_string(),
id: ulid::Ulid::new().to_string(),
source: "https://orders.acme.com".to_string(),
event_type: "io.coresdk.policy.decision".to_string(),
subject: Some("tenant/acme/user/usr_2xK9".to_string()),
time: Some(chrono::Utc::now()),
datacontenttype: Some("application/json".to_string()),
data: serde_json::json!({ "result": "allowed" }),
};
// Serialize to CloudEvents JSON (spec 1.0 format)
let json_str = event.to_json()?;
// Build a CloudEvent from an OTel span
let ce = CloudEvent::from_otel_span(&span, "https://orders.acme.com")?;CloudEventEmitter
CloudEventEmitter is the background component that wraps OTel log records and ships them over the configured binding (HTTP or Kafka):
use coresdk_cloudevents::{CloudEventEmitter, EmitterConfig, Binding};
let emitter = CloudEventEmitter::new(EmitterConfig {
source: "https://orders.acme.com".to_string(),
binding: Binding::Http {
endpoint: "https://events.acme.internal/ingest".to_string(),
batch_size: 50,
flush_secs: 5,
},
});
// Register as an OTel SpanProcessor — fires on every span end
sdk.register_span_processor(emitter);Why CloudEvents alongside OTel
OTel gives you a consistent trace and metric model across services. CloudEvents gives you a consistent event envelope model for async messaging. They solve different problems and compose naturally:
- OTel traces answer "what happened in this request across service boundaries?"
- CloudEvents answer "what event occurred, from what source, at what time, with what subject?"
When a CoreSDK policy decision, auth event, or tenant lifecycle change needs to fan out to downstream subscribers — audit consumers, alerting pipelines, data warehouses — a CloudEvents envelope lets those consumers use standard CloudEvents SDKs rather than parsing raw OTEL log records.
Configuration
# Phase 2 — configure via sidecar YAML
# cloud_events:
# enabled: true
# source: https://orders.acme.comimport "github.com/coresdk/sdk"
client, err := sdk.New(sdk.Config{
Tenant: "acme",
OtelEndpoint: "http://localhost:4317",
CloudEvents: true,
CloudEventsSource: "https://orders.acme.com",
})let engine = Engine::from_env().await?;
// cloud_events configured via coresdk-sidecar.yamlSidecar YAML:
# coresdk-sidecar.yaml
tenant: acme
otel:
endpoint: http://localhost:4317
cloud_events:
enabled: true
source: https://orders.acme.com
# binding: http # http (default) or kafkaJSON payload example
Each exported OTel log event is wrapped in a CloudEvents envelope. The data field contains the
CoreSDK event payload; the outer fields are standard CloudEvents attributes.
{
"specversion": "1.0",
"type": "io.coresdk.policy.decision",
"source": "https://orders.acme.com",
"subject": "tenant/acme/user/usr_2xK9",
"id": "01HX7KQMB4NWE9P6T2JS0RY3ZV",
"time": "2026-03-19T14:35:00Z",
"datacontenttype": "application/json",
"data": {
"tenant_id": "acme",
"user_id": "usr_2xK9",
"action": "orders:create",
"result": "allowed",
"policy_latency_us": 43,
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7"
}
}Auth events use a different type:
{
"specversion": "1.0",
"type": "io.coresdk.auth.verified",
"source": "https://orders.acme.com",
"subject": "tenant/acme/user/usr_2xK9",
"id": "01HX7M3NR8PQT5WE6VKZJ2CY4B",
"time": "2026-03-19T14:35:00Z",
"datacontenttype": "application/json",
"data": {
"tenant_id": "acme",
"user_id": "usr_2xK9",
"auth_method": "jwt",
"result": "allowed",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736"
}
}Event types
type | Trigger |
|---|---|
io.coresdk.policy.decision | Every Rego policy evaluation |
io.coresdk.auth.verified | Successful JWT/JWK verification |
io.coresdk.auth.denied | Auth rejection (invalid token, expired, wrong audience) |
io.coresdk.tenant.created | New tenant registered |
io.coresdk.tenant.suspended | Tenant suspended by policy or operator |
io.coresdk.secret.rotated | Secret value refreshed from vault |
io.coresdk.error | Any RFC 9457 error emitted by the SDK |
HTTP binding
With the HTTP binding (default), CoreSDK posts each event as an HTTP request to the configured endpoint using the CloudEvents HTTP binding spec.
cloud_events:
enabled: true
source: https://orders.acme.com
binding: http
http:
endpoint: https://events.acme.internal/ingest
headers:
Authorization: "Bearer ${EVENTS_TOKEN}"
batch_size: 50 # send up to 50 events per request
flush_interval: 5 # secondsThe HTTP binding uses Content-Type: application/cloudevents+json for single events and
application/cloudevents-batch+json for batched requests.
Kafka binding
With the Kafka binding, CoreSDK publishes each event to a Kafka topic. The CloudEvents Kafka binding spec maps event attributes to Kafka message headers.
cloud_events:
enabled: true
source: https://orders.acme.com
binding: kafka
kafka:
brokers:
- kafka-1.acme.internal:9092
- kafka-2.acme.internal:9092
topic: coresdk-events
tls: true
sasl:
mechanism: SCRAM-SHA-512
username: ${KAFKA_USERNAME}
password: ${KAFKA_PASSWORD}The Kafka message key is set to subject (e.g. tenant/acme/user/usr_2xK9) so that events for
the same subject are delivered to the same partition in order.
Consumer example:
from confluent_kafka import Consumer
import json
consumer = Consumer({
"bootstrap.servers": "kafka-1.acme.internal:9092",
"group.id": "audit-consumer",
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "audit-consumer",
"sasl.password": "...",
})
consumer.subscribe(["coresdk-events"])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
event = json.loads(msg.value())
event_type = msg.headers().get("ce_type", b"").decode()
if event_type == "io.coresdk.policy.decision":
handle_policy_decision(event["data"])
elif event_type == "io.coresdk.auth.denied":
handle_auth_denial(event["data"])source and subject fields
The source field is the URI you configure via cloud_events_source. It should identify the
service that produced the event — typically the public or internal base URL of the service.
The subject field is set automatically by CoreSDK and follows this structure:
tenant/{tenant_id}/user/{user_id}For events not tied to a specific user (e.g. io.coresdk.tenant.created):
tenant/{tenant_id}Security
CloudEvents export goes through the same egress path as OTEL export and is subject to PII masking.
User IDs and tenant IDs appear in the subject and data fields but variable values from request
bodies are never included.
If you need to gate which event types are exported, use the customer_egress_policy field:
cloud_events:
enabled: true
source: https://orders.acme.com
binding: kafka
customer_egress_policy: "acme/cloudevents-egress"
kafka:
brokers:
- kafka-1.acme.internal:9092
topic: coresdk-eventsThe Rego policy receives input.event_type and input.subject and must return allow = true
for the event to be published.
Next steps
- OpenTelemetry — OTEL traces and metrics configuration
- LLM Trace Export — minimal diagnostic payloads for LLM root-cause analysis
- Authorization & Policy — writing Rego egress policies
LLM Trace Export
Send a minimal, privacy-safe diagnostic payload to an LLM for root-cause analysis — no variable values, no PII, local-only by default.
AI Root Cause Analysis
Automated root cause analysis using LLM-optimized trace payloads — local by default, with optional external LLM API integration and strict privacy guarantees.