Skip to main content
CoreSDK
Observability

CloudEvents Envelope

Wrap CoreSDK OTel log events in CloudEvents 1.0 metadata for interoperability with Kafka, EventBridge, Knative Eventing, and Azure Event Grid.

Phase note. Ships Phase 2.

CloudEvents Envelope

CoreSDK can wrap its OTel log events in a CloudEvents 1.0 metadata envelope before export. This makes the events consumable by event-driven systems — Kafka, EventBridge, Knative Eventing, Azure Event Grid — without any transformation layer on the consumer side.

Traces and metrics continue to export over OTLP in standard format. CloudEvents wrapping applies to log events only.


Rust API

CloudEvent struct

use coresdk_cloudevents::{CloudEvent, CloudEventEmitter};
use opentelemetry::trace::Span;

// Build a CloudEvent directly
let event = CloudEvent {
    specversion:     "1.0".to_string(),
    id:              ulid::Ulid::new().to_string(),
    source:          "https://orders.acme.com".to_string(),
    event_type:      "io.coresdk.policy.decision".to_string(),
    subject:         Some("tenant/acme/user/usr_2xK9".to_string()),
    time:            Some(chrono::Utc::now()),
    datacontenttype: Some("application/json".to_string()),
    data:            serde_json::json!({ "result": "allowed" }),
};

// Serialize to CloudEvents JSON (spec 1.0 format)
let json_str = event.to_json()?;

// Build a CloudEvent from an OTel span
let ce = CloudEvent::from_otel_span(&span, "https://orders.acme.com")?;

CloudEventEmitter

CloudEventEmitter is the background component that wraps OTel log records and ships them over the configured binding (HTTP or Kafka):

use coresdk_cloudevents::{CloudEventEmitter, EmitterConfig, Binding};

let emitter = CloudEventEmitter::new(EmitterConfig {
    source:  "https://orders.acme.com".to_string(),
    binding: Binding::Http {
        endpoint:   "https://events.acme.internal/ingest".to_string(),
        batch_size: 50,
        flush_secs: 5,
    },
});

// Register as an OTel SpanProcessor — fires on every span end
sdk.register_span_processor(emitter);

Why CloudEvents alongside OTel

OTel gives you a consistent trace and metric model across services. CloudEvents gives you a consistent event envelope model for async messaging. They solve different problems and compose naturally:

  • OTel traces answer "what happened in this request across service boundaries?"
  • CloudEvents answer "what event occurred, from what source, at what time, with what subject?"

When a CoreSDK policy decision, auth event, or tenant lifecycle change needs to fan out to downstream subscribers — audit consumers, alerting pipelines, data warehouses — a CloudEvents envelope lets those consumers use standard CloudEvents SDKs rather than parsing raw OTEL log records.


Configuration

# Phase 2 — configure via sidecar YAML
# cloud_events:
#   enabled: true
#   source: https://orders.acme.com
import "github.com/coresdk/sdk"

client, err := sdk.New(sdk.Config{
    Tenant:            "acme",
    OtelEndpoint:      "http://localhost:4317",
    CloudEvents:       true,
    CloudEventsSource: "https://orders.acme.com",
})
let engine = Engine::from_env().await?;
// cloud_events configured via coresdk-sidecar.yaml

Sidecar YAML:

# coresdk-sidecar.yaml
tenant: acme

otel:
  endpoint: http://localhost:4317

cloud_events:
  enabled: true
  source: https://orders.acme.com
  # binding: http    # http (default) or kafka

JSON payload example

Each exported OTel log event is wrapped in a CloudEvents envelope. The data field contains the CoreSDK event payload; the outer fields are standard CloudEvents attributes.

{
  "specversion": "1.0",
  "type": "io.coresdk.policy.decision",
  "source": "https://orders.acme.com",
  "subject": "tenant/acme/user/usr_2xK9",
  "id": "01HX7KQMB4NWE9P6T2JS0RY3ZV",
  "time": "2026-03-19T14:35:00Z",
  "datacontenttype": "application/json",
  "data": {
    "tenant_id": "acme",
    "user_id": "usr_2xK9",
    "action": "orders:create",
    "result": "allowed",
    "policy_latency_us": 43,
    "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
    "span_id": "00f067aa0ba902b7"
  }
}

Auth events use a different type:

{
  "specversion": "1.0",
  "type": "io.coresdk.auth.verified",
  "source": "https://orders.acme.com",
  "subject": "tenant/acme/user/usr_2xK9",
  "id": "01HX7M3NR8PQT5WE6VKZJ2CY4B",
  "time": "2026-03-19T14:35:00Z",
  "datacontenttype": "application/json",
  "data": {
    "tenant_id": "acme",
    "user_id": "usr_2xK9",
    "auth_method": "jwt",
    "result": "allowed",
    "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736"
  }
}

Event types

typeTrigger
io.coresdk.policy.decisionEvery Rego policy evaluation
io.coresdk.auth.verifiedSuccessful JWT/JWK verification
io.coresdk.auth.deniedAuth rejection (invalid token, expired, wrong audience)
io.coresdk.tenant.createdNew tenant registered
io.coresdk.tenant.suspendedTenant suspended by policy or operator
io.coresdk.secret.rotatedSecret value refreshed from vault
io.coresdk.errorAny RFC 9457 error emitted by the SDK

HTTP binding

With the HTTP binding (default), CoreSDK posts each event as an HTTP request to the configured endpoint using the CloudEvents HTTP binding spec.

cloud_events:
  enabled: true
  source: https://orders.acme.com
  binding: http
  http:
    endpoint: https://events.acme.internal/ingest
    headers:
      Authorization: "Bearer ${EVENTS_TOKEN}"
    batch_size: 50          # send up to 50 events per request
    flush_interval: 5       # seconds

The HTTP binding uses Content-Type: application/cloudevents+json for single events and application/cloudevents-batch+json for batched requests.


Kafka binding

With the Kafka binding, CoreSDK publishes each event to a Kafka topic. The CloudEvents Kafka binding spec maps event attributes to Kafka message headers.

cloud_events:
  enabled: true
  source: https://orders.acme.com
  binding: kafka
  kafka:
    brokers:
      - kafka-1.acme.internal:9092
      - kafka-2.acme.internal:9092
    topic: coresdk-events
    tls: true
    sasl:
      mechanism: SCRAM-SHA-512
      username: ${KAFKA_USERNAME}
      password: ${KAFKA_PASSWORD}

The Kafka message key is set to subject (e.g. tenant/acme/user/usr_2xK9) so that events for the same subject are delivered to the same partition in order.

Consumer example:

from confluent_kafka import Consumer
import json

consumer = Consumer({
    "bootstrap.servers": "kafka-1.acme.internal:9092",
    "group.id": "audit-consumer",
    "auto.offset.reset": "earliest",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "SCRAM-SHA-512",
    "sasl.username": "audit-consumer",
    "sasl.password": "...",
})

consumer.subscribe(["coresdk-events"])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue

    event = json.loads(msg.value())
    event_type = msg.headers().get("ce_type", b"").decode()

    if event_type == "io.coresdk.policy.decision":
        handle_policy_decision(event["data"])
    elif event_type == "io.coresdk.auth.denied":
        handle_auth_denial(event["data"])

source and subject fields

The source field is the URI you configure via cloud_events_source. It should identify the service that produced the event — typically the public or internal base URL of the service.

The subject field is set automatically by CoreSDK and follows this structure:

tenant/{tenant_id}/user/{user_id}

For events not tied to a specific user (e.g. io.coresdk.tenant.created):

tenant/{tenant_id}

Security

CloudEvents export goes through the same egress path as OTEL export and is subject to PII masking. User IDs and tenant IDs appear in the subject and data fields but variable values from request bodies are never included.

If you need to gate which event types are exported, use the customer_egress_policy field:

cloud_events:
  enabled: true
  source: https://orders.acme.com
  binding: kafka
  customer_egress_policy: "acme/cloudevents-egress"
  kafka:
    brokers:
      - kafka-1.acme.internal:9092
    topic: coresdk-events

The Rego policy receives input.event_type and input.subject and must return allow = true for the event to be published.


Next steps

On this page