0% complete
Platform Track Medium-Hard 3-5 hours

Event Processing Deep Dive

Understand how agent events flow from emission to storage to API. Learn the CloudEvents pattern.

🎯 The Mission

The agent emits events as it works: findings discovered, leads generated, tools called. These events power the real-time UI timeline. Understanding this flow is essential for debugging and extending the platform.

Trace an event from agent emission to API response. Then add support for a new event type.

Event Flow Architecture

EVENT PIPELINE
┌─────────────────────────────────────────────────────────────────┐
│                         AGENT                                    │
│  emitter.emit_finding() → AgentEvent (Pydantic)                 │
└──────────────────────────────┬──────────────────────────────────┘
                               │ SNS Publish
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                       SNS TOPIC                                  │
│  agent-events-{env}                                             │
└──────────────────────────────┬──────────────────────────────────┘
                               │ SQS Subscribe
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                       SQS QUEUE                                  │
│  agent-events-queue-{env}                                       │
└──────────────────────────────┬──────────────────────────────────┘
                               │ Lambda/Worker polls
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                    EVENT PROCESSOR                               │
│  event_parser.py → parse_event() → ScanEvent model → DB        │
└──────────────────────────────┬──────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                      DATABASE                                    │
│  scan_event table (event_type, event_data JSONB)                │
└──────────────────────────────┬──────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                     API ENDPOINT                                 │
│  GET /v1/scans/{id}/events → ScanEventService → Response        │
└─────────────────────────────────────────────────────────────────┘
            

CloudEvents Standard

Tenzai events follow the CloudEvents specification:

{
    "type": "com.tenzai.agent.finding.new",
    "source": "agent/bonzai",
    "id": "uuid-here",
    "time": "2025-01-27T10:00:00Z",
    "data": {
        "finding_id": "...",
        "severity": "high",
        "title": "SQL Injection in login endpoint"
    }
}

Current Event Types

com.tenzai.agent.finding.new com.tenzai.agent.lead.new com.tenzai.agent.lead.update com.tenzai.agent.endpoint.new com.tenzai.agent.thought.new com.tenzai.agent.tool_call.new com.tenzai.agent.agent_scope.new com.tenzai.agent.task.new com.tenzai.agent.error.new

Exercise 1: Trace the Flow

Follow an event through the entire pipeline:

  1. Find where emit_finding() is called in agent/events/
  2. See how the event is serialized and published to SNS
  3. Find the SQS consumer that processes incoming events
  4. Trace through event_parser.py and discriminated unions
  5. See how events are stored in scan_event table
  6. Find how /v1/scans/{id}/events queries and returns them

Exercise 2: Add a New Event Type

Add a com.tenzai.agent.screenshot.new event for browser screenshots.

Step 1: Define the Event Schema (Agent Side)

In agent/events/schema/:

from pydantic import BaseModel

class ScreenshotEventData(BaseModel):
    screenshot_id: str
    url: str
    s3_key: str
    width: int
    height: int
    timestamp: datetime

Step 2: Add to Discriminated Union

# In the AgentEvent union
AgentEvent = Annotated[
    FindingEvent | LeadEvent | ... | ScreenshotEvent,
    Field(discriminator="type")
]

Step 3: Handle in Platform Parser

The event_parser.py should automatically handle it via the discriminator.

Step 4: Update API Response

In parse_event_for_api(), add any sanitization or transformation needed:

def parse_event_for_api(event_data: dict, usage: dict) -> dict:
    event_type = event_data.get("type", "")
    
    if event_type == "com.tenzai.agent.screenshot.new":
        # Maybe presign the S3 URL?
        return {
            **event_data,
            "data": {
                **event_data.get("data", {}),
                "presigned_url": presign_s3_key(event_data["data"]["s3_key"])
            }
        }
    
    # ... existing handlers

✓ Success Criteria

📋 Progress Checklist

Stretch Goals