AI API Proxy Pub/Sub Pattern

Implement publish-subscribe messaging for AI services. Decouple producers from consumers, build real-time event pipelines, and enable fan-out architectures for scalable AI operations.

Pub/Sub Architecture
Publishers
📡
API Gateway
🤖
AI Service
⚙️
Worker
↓ Publish Events ↓
Broker
📨
Message Broker
↓ Fan-Out ↓
Subscribers
📊
Analytics
🔔
Notifications
💾
Storage

Pub/Sub Benefits

Advantages of publish-subscribe for AI systems.

🔌

Loose Coupling

Publishers and subscribers operate independently. Add new subscribers without modifying publishers or existing consumers.

📢

Fan-Out Broadcasting

One message reaches multiple subscribers simultaneously. Perfect for analytics, logging, and notifications.

Asynchronous Processing

Publishers continue immediately after publishing. Subscribers process at their own pace without blocking.

📈

Independent Scaling

Scale subscribers independently based on processing needs. Different consumers can have different throughput requirements.

🛡️

Fault Isolation

Subscriber failures don't affect publishers or other subscribers. Each consumer handles its own errors.

🔄

Flexible Routing

Route events based on topics, patterns, or content. Implement complex event flows with simple subscriptions.

Pub/Sub Topics

Common topics for AI API proxy systems.

🎯

ai.requests

Published when a new AI request is received through the gateway. Contains request details and metadata.

Subscribers
  • Rate Limiter Service
  • Request Logger
  • Cost Tracker
  • Analytics Pipeline

ai.completions

Published when an AI request completes successfully. Contains response data and performance metrics.

Subscribers
  • Response Cache
  • Billing Service
  • Quality Monitor
  • WebSocket Gateway
⚠️

ai.errors

Published when an AI request fails. Contains error details for monitoring and retry processing.

Subscribers
  • Error Tracker (Sentry)
  • Alert System
  • Dead Letter Queue
  • Root Cause Analysis
📊

ai.metrics

Periodic metrics about API usage, performance, and resource consumption for monitoring.

Subscribers
  • Time-Series Database
  • Dashboard Service
  • Auto-Scaler
  • Cost Reporter

Implementation Guide

Build a pub/sub enabled AI gateway.

pubsub_gateway.py Python
class PubSubGateway:
    """Pub/Sub enabled AI API gateway"""
    
    def __init__(self, broker_client):
        self.broker = broker_client
        self.handlers = {}
        
    async def publish(
        self, 
        topic: str, 
        event: dict,
        headers: dict = None
    ):
        """Publish event to topic"""
        await self.broker.publish(
            topic=topic,
            message=json.dumps(event),
            headers=headers or {}
        )
        
    async def subscribe(
        self,
        topic: str,
        subscriber_name: str,
        handler: callable
    ):
        """Subscribe to topic with handler"""
        self.handlers[subscriber_name] = handler
        await self.broker.subscribe(
            topic=topic,
            subscription=subscriber_name,
            callback=self._create_callback(subscriber_name)
        )
    
    def _create_callback(self, name):
        async def callback(message):
            try:
                event = json.loads(message.data)
                await self.handlers[name](event)
                await message.ack()
            except Exception as e:
                await message.nack()
                logger.error(f"Handler {name} failed: {e}")
        return callback

# Usage example
async def setup_gateway():
    gateway = PubSubGateway(GooglePubSubClient())
    
    # Subscribe to completion events
    async def handle_completion(event):
        # Cache the response
        await cache.set(
            event['request_id'],
            event['response']
        )
    
    await gateway.subscribe(
        topic='ai.completions',
        subscriber_name='cache-service',
        handler=handle_completion
    )
    
    # Subscribe to error events
    async def handle_error(event):
        # Send alert
        await alert_service.send(
            f"AI Error: {event['error']}"
        )
    
    await gateway.subscribe(
        topic='ai.errors',
        subscriber_name='alert-service',
        handler=handle_error
    )
    
    return gateway

# Publisher example
async def process_request(gateway, request):
    # Publish request event
    await gateway.publish(
        topic='ai.requests',
        event={
            'request_id': request.id,
            'model': request.model,
            'timestamp': datetime.utcnow().isoformat()
        }
    )
    
    # Process AI request
    result = await ai_client.complete(request)
    
    # Publish completion event
    await gateway.publish(
        topic='ai.completions',
        event={
            'request_id': request.id,
            'response': result,
            'latency_ms': result.latency
        }
    )

Partner Resources