Real-Time Reddit Monitoring System Architecture
Build production monitoring systems that detect brand mentions, sentiment shifts, and trending topics in real-time
Real-time Reddit monitoring enables brands to respond to crises before they escalate, capture emerging trends ahead of competitors, and maintain awareness of customer sentiment. This guide covers building monitoring systems that process thousands of posts per minute with sub-minute alerting latency.
System Requirements
Define your monitoring requirements before choosing an architecture. Different use cases demand different trade-offs between latency, throughput, and cost.
| Use Case | Latency Target | Volume | Alerting | Complexity |
|---|---|---|---|---|
| Crisis Detection | <1 minute | High | Immediate | High |
| Brand Monitoring | <5 minutes | Medium | Near real-time | Medium |
| Trend Detection | <15 minutes | High | Periodic | Medium |
| Competitive Intel | <1 hour | Medium | Daily digest | Low |
Reference Architecture
Design for horizontal scaling, graceful degradation under load, and idempotent processing. Every component should be stateless where possible to enable easy scaling.
┌─────────────────────────────────────────────────────────────────────┐
│ INGESTION LAYER │
├─────────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Reddit API │ │ Pushshift │ │ Webhook │ │
│ │ Poller (x3) │ │ Stream │ │ Receiver │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Message Queue │ │
│ │ (Kafka/SQS) │ │
│ └────────┬────────┘ │
└─────────────────────────────┼───────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────────┐
│ PROCESSING LAYER │
├─────────────────────────────────────────────────────────────────────┤
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Stream Processor │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ Dedup │→ │ Classify │→ │ Enrich │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ └──────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Alert │ │ Aggregate │ │ Store │ │
│ │ Engine │ │ (Window) │ │ (Raw) │ │
│ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │
└────────┼───────────────────┼───────────────────┼────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ OUTPUT LAYER │
├─────────────────────────────────────────────────────────────────────┤
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Slack/ │ │ Dashboard │ │ Data │ │
│ │ Email/SMS │ │ (Grafana) │ │ Warehouse │ │
│ └────────────┘ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Streaming Ingestion
Build a resilient ingestion service that maintains continuous Reddit data flow despite rate limits and API interruptions.
import asyncio import aiohttp from datetime import datetime, timedelta from typing import List, Dict, Set, AsyncIterator from dataclasses import dataclass, asdict import json import logging @dataclass class RedditPost: id: str subreddit: str title: str selftext: str author: str score: int created_utc: float url: str num_comments: int class RealTimeIngestion: """ Real-time Reddit ingestion with: - Concurrent subreddit polling - Automatic rate limit handling - Deduplication - Graceful error recovery """ def __init__( self, client_id: str, client_secret: str, subreddits: List[str], poll_interval: int = 30, max_concurrent: int = 5 ): self.client_id = client_id self.client_secret = client_secret self.subreddits = subreddits self.poll_interval = poll_interval self.max_concurrent = max_concurrent self.seen_ids: Set[str] = set() self.session: aiohttp.ClientSession = None self.access_token: str = None self.token_expires: datetime = None self.logger = logging.getLogger(__name__) # Rate limiting state self.remaining_requests = 100 self.reset_time = datetime.now() async def start(self): """Initialize session and authenticate.""" self.session = aiohttp.ClientSession() await self._authenticate() async def stop(self): """Clean shutdown.""" if self.session: await self.session.close() async def _authenticate(self): """Get OAuth2 access token.""" auth = aiohttp.BasicAuth(self.client_id, self.client_secret) data = {'grant_type': 'client_credentials'} async with self.session.post( 'https://www.reddit.com/api/v1/access_token', auth=auth, data=data, headers={'User-Agent': 'RealTimeMonitor/1.0'} ) as resp: result = await resp.json() self.access_token = result['access_token'] self.token_expires = datetime.now() + timedelta( seconds=result['expires_in'] - 60 ) async def _ensure_token(self): """Refresh token if expired.""" if datetime.now() >= self.token_expires: await self._authenticate() async def _handle_rate_limit(self, headers: Dict): """Update rate limit state from response headers.""" self.remaining_requests = int( headers.get('X-Ratelimit-Remaining', 100) ) reset_seconds = float( headers.get('X-Ratelimit-Reset', 60) ) self.reset_time = datetime.now() + timedelta(seconds=reset_seconds) if self.remaining_requests < 10: wait_time = (self.reset_time - datetime.now()).total_seconds() self.logger.warning(f"Rate limit low, waiting {wait_time:.0f}s") await asyncio.sleep(max(1, wait_time)) async def fetch_subreddit( self, subreddit: str ) -> List[RedditPost]: """Fetch latest posts from a subreddit.""" await self._ensure_token() url = f"https://oauth.reddit.com/r/{subreddit}/new" headers = { 'Authorization': f'Bearer {self.access_token}', 'User-Agent': 'RealTimeMonitor/1.0' } try: async with self.session.get( url, headers=headers, params={'limit': 25} ) as resp: await self._handle_rate_limit(resp.headers) if resp.status == 429: await asyncio.sleep(60) return [] data = await resp.json() posts = [] for child in data['data']['children']: post_data = child['data'] post = RedditPost( id=post_data['id'], subreddit=post_data['subreddit'], title=post_data['title'], selftext=post_data.get('selftext', ''), author=post_data['author'], score=post_data['score'], created_utc=post_data['created_utc'], url=post_data['url'], num_comments=post_data['num_comments'] ) posts.append(post) return posts except Exception as e: self.logger.error(f"Error fetching {subreddit}: {e}") return [] async def stream(self) -> AsyncIterator[RedditPost]: """ Stream new posts from monitored subreddits. Yields only new posts (deduped by ID). """ await self.start() try: while True: # Fetch all subreddits concurrently tasks = [] for i in range(0, len(self.subreddits), self.max_concurrent): batch = self.subreddits[i:i + self.max_concurrent] tasks = [self.fetch_subreddit(s) for s in batch] results = await asyncio.gather(*tasks) for posts in results: for post in posts: if post.id not in self.seen_ids: self.seen_ids.add(post.id) yield post # Trim seen_ids to prevent memory growth if len(self.seen_ids) > 50000: self.seen_ids = set(list(self.seen_ids)[-25000:]) await asyncio.sleep(self.poll_interval) finally: await self.stop() # Usage async def main(): ingestion = RealTimeIngestion( client_id='your_client_id', client_secret='your_secret', subreddits=['technology', 'programming', 'startups'], poll_interval=30 ) async for post in ingestion.stream(): print(f"[{post.subreddit}] {post.title[:50]}") # Send to message queue await publish_to_queue(post) asyncio.run(main())
Real-Time Classification
Classify incoming posts for relevance, sentiment, and category as they arrive. Use lightweight models for speed.
from dataclasses import dataclass from typing import List, Dict, Optional import re @dataclass class ClassificationResult: is_relevant: bool relevance_score: float sentiment: str sentiment_score: float categories: List[str] matched_keywords: List[str] priority: str # critical, high, medium, low class RealTimeClassifier: """ Fast classification for real-time processing. Uses rule-based matching + lightweight ML for speed. Targets <10ms per classification. """ def __init__( self, brand_keywords: List[str], competitor_keywords: List[str] = None, crisis_keywords: List[str] = None, sentiment_model = None ): # Build regex patterns for fast matching self.brand_pattern = self._build_pattern(brand_keywords) self.competitor_pattern = self._build_pattern( competitor_keywords or [] ) self.crisis_pattern = self._build_pattern( crisis_keywords or [ 'scam', 'fraud', 'lawsuit', 'breach', 'hack', 'lawsuit', 'class action' ] ) self.sentiment_model = sentiment_model self.brand_keywords = brand_keywords def _build_pattern(self, keywords: List[str]) -> Optional[re.Pattern]: """Build compiled regex pattern for keywords.""" if not keywords: return None pattern = '|'.join( re.escape(kw.lower()) for kw in keywords ) return re.compile(rf'\b({pattern})\b', re.IGNORECASE) def _match_keywords( self, text: str, pattern: re.Pattern ) -> List[str]: """Find all matching keywords in text.""" if not pattern: return [] return pattern.findall(text.lower()) def _quick_sentiment(self, text: str) -> Tuple[str, float]: """Fast rule-based sentiment for latency-critical paths.""" text_lower = text.lower() positive_words = [ 'love', 'great', 'amazing', 'excellent', 'awesome', 'fantastic', 'recommend' ] negative_words = [ 'hate', 'terrible', 'awful', 'worst', 'horrible', 'avoid', 'scam' ] pos_count = sum(1 for w in positive_words if w in text_lower) neg_count = sum(1 for w in negative_words if w in text_lower) if pos_count > neg_count: score = min(1.0, 0.5 + pos_count * 0.1) return 'positive', score elif neg_count > pos_count: score = min(1.0, 0.5 + neg_count * 0.1) return 'negative', score else: return 'neutral', 0.5 def classify(self, post: RedditPost) -> ClassificationResult: """Classify a post in real-time.""" text = f"{post.title} {post.selftext}" # Keyword matching brand_matches = self._match_keywords(text, self.brand_pattern) competitor_matches = self._match_keywords(text, self.competitor_pattern) crisis_matches = self._match_keywords(text, self.crisis_pattern) # Determine relevance is_relevant = bool(brand_matches or competitor_matches) relevance_score = min(1.0, len(brand_matches) * 0.3) # Sentiment analysis if self.sentiment_model and is_relevant: # Use ML model for relevant posts result = self.sentiment_model.predict([text])[0] sentiment = result['label'] sentiment_score = result['confidence'] else: # Quick rule-based for irrelevant posts sentiment, sentiment_score = self._quick_sentiment(text) # Categorization categories = [] if brand_matches: categories.append('brand_mention') if competitor_matches: categories.append('competitor_mention') if crisis_matches: categories.append('crisis_signal') if '?' in post.title: categories.append('question') # Priority determination if crisis_matches and brand_matches: priority = 'critical' elif brand_matches and sentiment == 'negative': priority = 'high' elif brand_matches: priority = 'medium' else: priority = 'low' return ClassificationResult( is_relevant=is_relevant, relevance_score=relevance_score, sentiment=sentiment, sentiment_score=sentiment_score, categories=categories, matched_keywords=brand_matches + competitor_matches, priority=priority )
Alert Engine
Define flexible alert rules that trigger on specific conditions. Support both instant alerts and threshold-based triggers.
Too many alerts desensitize teams. Use tiered severity, aggregate similar alerts, and implement cooldown periods. Target <10 actionable alerts per day for most monitoring scenarios.
from dataclasses import dataclass from datetime import datetime, timedelta from typing import List, Dict, Callable, Optional from collections import deque import asyncio @dataclass class AlertRule: name: str condition: Callable # Returns True if alert should fire severity: str # critical, warning, info cooldown_minutes: int = 15 channels: List[str] = None # slack, email, sms @dataclass class Alert: rule_name: str severity: str title: str description: str post: RedditPost classification: ClassificationResult triggered_at: datetime class AlertEngine: """ Real-time alert engine with: - Configurable alert rules - Cooldown periods - Multi-channel delivery - Windowed threshold alerts """ def __init__(self): self.rules: List[AlertRule] = [] self.last_fired: Dict[str, datetime] = {} self.alert_handlers: Dict[str, Callable] = {} # Sliding window for threshold alerts self.windows: Dict[str, deque] = {} def add_rule(self, rule: AlertRule): """Register an alert rule.""" self.rules.append(rule) def register_handler(self, channel: str, handler: Callable): """Register alert delivery handler for a channel.""" self.alert_handlers[channel] = handler def _is_in_cooldown(self, rule: AlertRule) -> bool: """Check if rule is in cooldown period.""" if rule.name not in self.last_fired: return False cooldown_end = self.last_fired[rule.name] + timedelta( minutes=rule.cooldown_minutes ) return datetime.now() < cooldown_end async def evaluate( self, post: RedditPost, classification: ClassificationResult ) -> List[Alert]: """Evaluate all rules against a classified post.""" alerts = [] for rule in self.rules: if self._is_in_cooldown(rule): continue try: if rule.condition(post, classification): alert = Alert( rule_name=rule.name, severity=rule.severity, title=f"[{rule.severity.upper()}] {rule.name}", description=self._build_description(post, classification), post=post, classification=classification, triggered_at=datetime.now() ) alerts.append(alert) self.last_fired[rule.name] = datetime.now() # Deliver alert await self._deliver_alert(alert, rule.channels or ['slack']) except Exception as e: print(f"Error evaluating rule {rule.name}: {e}") return alerts def _build_description( self, post: RedditPost, classification: ClassificationResult ) -> str: """Build alert description.""" return f""" **Subreddit:** r/{post.subreddit} **Title:** {post.title} **Sentiment:** {classification.sentiment} ({classification.sentiment_score:.2f}) **Keywords:** {', '.join(classification.matched_keywords)} **Link:** https://reddit.com{post.url} """.strip() async def _deliver_alert( self, alert: Alert, channels: List[str] ): """Send alert to specified channels.""" for channel in channels: if channel in self.alert_handlers: try: await self.alert_handlers[channel](alert) except Exception as e: print(f"Failed to deliver to {channel}: {e}") # Example alert rules engine = AlertEngine() # Crisis alert - immediate engine.add_rule(AlertRule( name="Crisis Detection", condition=lambda p, c: 'crisis_signal' in c.categories and c.priority == 'critical', severity="critical", cooldown_minutes=5, channels=['slack', 'sms'] )) # Negative sentiment alert engine.add_rule(AlertRule( name="Negative Brand Mention", condition=lambda p, c: ( c.is_relevant and c.sentiment == 'negative' and c.sentiment_score > 0.7 ), severity="warning", cooldown_minutes=15, channels=['slack'] )) # High engagement alert engine.add_rule(AlertRule( name="Viral Post", condition=lambda p, c: c.is_relevant and p.score > 1000, severity="info", cooldown_minutes=60, channels=['slack'] ))
Real-Time Monitoring Without Infrastructure
reddapi.dev provides instant Reddit search with built-in sentiment analysis. Get alerts through our API without building pipelines.
Start MonitoringAlert Delivery Integrations
Connect your alert engine to communication platforms for immediate notification.
import aiohttp async def slack_handler(alert: Alert): """Send alert to Slack webhook.""" webhook_url = "https://hooks.slack.com/services/..." color = { 'critical': '#dc2626', 'warning': '#f59e0b', 'info': '#3b82f6' }.get(alert.severity, '#6b7280') payload = { "attachments": [{ "color": color, "title": alert.title, "text": alert.description, "footer": f"Triggered at {alert.triggered_at.isoformat()}", "actions": [{ "type": "button", "text": "View Post", "url": f"https://reddit.com{alert.post.url}" }] }] } async with aiohttp.ClientSession() as session: await session.post(webhook_url, json=payload) # Register handlers engine.register_handler('slack', slack_handler)