Create a mini version of Tenzai's event ingestion system: SNS → SQS → Lambda → DynamoDB.
Tenzai's agent emits events as it works. These events need to be reliably captured, processed, and stored. This is a classic event-driven architecture pattern used throughout AWS.
Build your own event pipeline from scratch. Publish test events and watch them flow through to storage.
tenzai-sandboxProject=onboarding, Owner=your-namealice-event-topic)
┌─────────────────────────────────────────────────────────────────┐
│ Publisher (CLI or Test Script) │
│ aws sns publish --topic-arn ... --message '{"type":"test"}' │
└──────────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SNS Topic │
│ {name}-events │
│ - Fan-out capability (multiple subscribers) │
└──────────────────────────────┬──────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
▼ ▼
┌───────────────────────────┐ ┌─────────────────────────────────┐
│ SQS Queue │ │ S3 Bucket (Raw Archive) │
│ {name}-events-queue │ │ {name}-events-raw │
│ - DLQ for failed msgs │ │ - All events for audit │
└─────────────┬─────────────┘ └─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Lambda Function │
│ {name}-event-processor │
│ - Parse event JSON │
│ - Transform/validate │
│ - Write to DynamoDB │
└──────────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ DynamoDB Table │
│ {name}-events │
│ - PK: event_type │
│ - SK: timestamp#event_id │
└─────────────────────────────────────────────────────────────────┘
resource "aws_sns_topic" "events" {
name = "${var.name}-events"
tags = {
Project = "onboarding"
Owner = var.name
}
}
resource "aws_sqs_queue" "events_dlq" {
name = "${var.name}-events-dlq"
message_retention_seconds = 1209600 # 14 days
}
resource "aws_sqs_queue" "events" {
name = "${var.name}-events-queue"
visibility_timeout_seconds = 60
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.events_dlq.arn
maxReceiveCount = 3
})
}
# Subscribe SQS to SNS
resource "aws_sns_topic_subscription" "sqs" {
topic_arn = aws_sns_topic.events.arn
protocol = "sqs"
endpoint = aws_sqs_queue.events.arn
}
# Allow SNS to send to SQS
resource "aws_sqs_queue_policy" "allow_sns" {
queue_url = aws_sqs_queue.events.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "sns.amazonaws.com" }
Action = "sqs:SendMessage"
Resource = aws_sqs_queue.events.arn
Condition = {
ArnEquals = { "aws:SourceArn" = aws_sns_topic.events.arn }
}
}]
})
}
resource "aws_dynamodb_table" "events" {
name = "${var.name}-events"
billing_mode = "PAY_PER_REQUEST"
hash_key = "event_type"
range_key = "timestamp_id"
attribute {
name = "event_type"
type = "S"
}
attribute {
name = "timestamp_id"
type = "S"
}
ttl {
attribute_name = "expires_at"
enabled = true
}
}
# lambda/handler.py
import json
import boto3
import os
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])
def handler(event, context):
for record in event['Records']:
# Parse SNS message from SQS
sns_message = json.loads(record['body'])
event_data = json.loads(sns_message['Message'])
# Write to DynamoDB
item = {
'event_type': event_data.get('type', 'unknown'),
'timestamp_id': f"{datetime.utcnow().isoformat()}#{context.aws_request_id}",
'data': event_data,
'received_at': datetime.utcnow().isoformat(),
}
table.put_item(Item=item)
print(f"Processed event: {item['event_type']}")
return {'statusCode': 200}
# Publish a test event
aws sns publish \
--topic-arn arn:aws:sns:us-east-1:123456789:your-name-events \
--message '{"type": "finding.new", "severity": "high", "title": "Test Finding"}'
# Check DynamoDB
aws dynamodb scan --table-name your-name-events
# Check SQS DLQ for failed messages
aws sqs receive-message --queue-url https://sqs.../your-name-events-dlq