0% complete
Infrastructure Track Medium 1-2 days

Build an Event Pipeline

Create a mini version of Tenzai's event ingestion system: SNS → SQS → Lambda → DynamoDB.

🎯 The Mission

Tenzai's agent emits events as it works. These events need to be reliably captured, processed, and stored. This is a classic event-driven architecture pattern used throughout AWS.

Build your own event pipeline from scratch. Publish test events and watch them flow through to storage.

🏖️ Sandbox Rules

Architecture

📊 EVENT PIPELINE
┌─────────────────────────────────────────────────────────────────┐
│  Publisher (CLI or Test Script)                                 │
│  aws sns publish --topic-arn ... --message '{"type":"test"}'   │
└──────────────────────────────┬──────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│  SNS Topic                                                      │
│  {name}-events                                                  │
│  - Fan-out capability (multiple subscribers)                    │
└──────────────────────────────┬──────────────────────────────────┘
                               │
                    ┌──────────┴──────────┐
                    │                     │
                    ▼                     ▼
┌───────────────────────────┐  ┌─────────────────────────────────┐
│  SQS Queue                │  │  S3 Bucket (Raw Archive)        │
│  {name}-events-queue      │  │  {name}-events-raw              │
│  - DLQ for failed msgs    │  │  - All events for audit         │
└─────────────┬─────────────┘  └─────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────────┐
│  Lambda Function                                                │
│  {name}-event-processor                                         │
│  - Parse event JSON                                             │
│  - Transform/validate                                           │
│  - Write to DynamoDB                                            │
└──────────────────────────────┬──────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│  DynamoDB Table                                                 │
│  {name}-events                                                  │
│  - PK: event_type                                               │
│  - SK: timestamp#event_id                                       │
└─────────────────────────────────────────────────────────────────┘
            

What You'll Learn

Implementation Guide

Step 1: Create SNS Topic

resource "aws_sns_topic" "events" {
  name = "${var.name}-events"
  
  tags = {
    Project = "onboarding"
    Owner   = var.name
  }
}

Step 2: Create SQS Queue with DLQ

resource "aws_sqs_queue" "events_dlq" {
  name = "${var.name}-events-dlq"
  message_retention_seconds = 1209600  # 14 days
}

resource "aws_sqs_queue" "events" {
  name = "${var.name}-events-queue"
  visibility_timeout_seconds = 60
  
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.events_dlq.arn
    maxReceiveCount     = 3
  })
}

# Subscribe SQS to SNS
resource "aws_sns_topic_subscription" "sqs" {
  topic_arn = aws_sns_topic.events.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.events.arn
}

# Allow SNS to send to SQS
resource "aws_sqs_queue_policy" "allow_sns" {
  queue_url = aws_sqs_queue.events.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect    = "Allow"
      Principal = { Service = "sns.amazonaws.com" }
      Action    = "sqs:SendMessage"
      Resource  = aws_sqs_queue.events.arn
      Condition = {
        ArnEquals = { "aws:SourceArn" = aws_sns_topic.events.arn }
      }
    }]
  })
}

Step 3: Create DynamoDB Table

resource "aws_dynamodb_table" "events" {
  name         = "${var.name}-events"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "event_type"
  range_key    = "timestamp_id"

  attribute {
    name = "event_type"
    type = "S"
  }

  attribute {
    name = "timestamp_id"
    type = "S"
  }

  ttl {
    attribute_name = "expires_at"
    enabled        = true
  }
}

Step 4: Create Lambda Function

# lambda/handler.py
import json
import boto3
import os
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])

def handler(event, context):
    for record in event['Records']:
        # Parse SNS message from SQS
        sns_message = json.loads(record['body'])
        event_data = json.loads(sns_message['Message'])
        
        # Write to DynamoDB
        item = {
            'event_type': event_data.get('type', 'unknown'),
            'timestamp_id': f"{datetime.utcnow().isoformat()}#{context.aws_request_id}",
            'data': event_data,
            'received_at': datetime.utcnow().isoformat(),
        }
        
        table.put_item(Item=item)
        print(f"Processed event: {item['event_type']}")
    
    return {'statusCode': 200}

Step 5: Test the Pipeline

# Publish a test event
aws sns publish \
  --topic-arn arn:aws:sns:us-east-1:123456789:your-name-events \
  --message '{"type": "finding.new", "severity": "high", "title": "Test Finding"}'

# Check DynamoDB
aws dynamodb scan --table-name your-name-events

# Check SQS DLQ for failed messages
aws sqs receive-message --queue-url https://sqs.../your-name-events-dlq

✓ Success Criteria

📋 Progress Checklist

Stretch Goals