Understand how agent events flow from emission to storage to API. Learn the CloudEvents pattern.
The agent emits events as it works: findings discovered, leads generated, tools called. These events power the real-time UI timeline. Understanding this flow is essential for debugging and extending the platform.
Trace an event from agent emission to API response. Then add support for a new event type.
┌─────────────────────────────────────────────────────────────────┐
│ AGENT │
│ emitter.emit_finding() → AgentEvent (Pydantic) │
└──────────────────────────────┬──────────────────────────────────┘
│ SNS Publish
▼
┌─────────────────────────────────────────────────────────────────┐
│ SNS TOPIC │
│ agent-events-{env} │
└──────────────────────────────┬──────────────────────────────────┘
│ SQS Subscribe
▼
┌─────────────────────────────────────────────────────────────────┐
│ SQS QUEUE │
│ agent-events-queue-{env} │
└──────────────────────────────┬──────────────────────────────────┘
│ Lambda/Worker polls
▼
┌─────────────────────────────────────────────────────────────────┐
│ EVENT PROCESSOR │
│ event_parser.py → parse_event() → ScanEvent model → DB │
└──────────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ DATABASE │
│ scan_event table (event_type, event_data JSONB) │
└──────────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ API ENDPOINT │
│ GET /v1/scans/{id}/events → ScanEventService → Response │
└─────────────────────────────────────────────────────────────────┘
Tenzai events follow the CloudEvents specification:
{
"type": "com.tenzai.agent.finding.new",
"source": "agent/bonzai",
"id": "uuid-here",
"time": "2025-01-27T10:00:00Z",
"data": {
"finding_id": "...",
"severity": "high",
"title": "SQL Injection in login endpoint"
}
}
Follow an event through the entire pipeline:
emit_finding() is called in agent/events/event_parser.py and discriminated unionsscan_event table/v1/scans/{id}/events queries and returns themAdd a com.tenzai.agent.screenshot.new event for browser screenshots.
In agent/events/schema/:
from pydantic import BaseModel
class ScreenshotEventData(BaseModel):
screenshot_id: str
url: str
s3_key: str
width: int
height: int
timestamp: datetime
# In the AgentEvent union
AgentEvent = Annotated[
FindingEvent | LeadEvent | ... | ScreenshotEvent,
Field(discriminator="type")
]
The event_parser.py should automatically handle it via the discriminator.
In parse_event_for_api(), add any sanitization or transformation needed:
def parse_event_for_api(event_data: dict, usage: dict) -> dict:
event_type = event_data.get("type", "")
if event_type == "com.tenzai.agent.screenshot.new":
# Maybe presign the S3 URL?
return {
**event_data,
"data": {
**event_data.get("data", {}),
"presigned_url": presign_s3_key(event_data["data"]["s3_key"])
}
}
# ... existing handlers