Documentation Index
Fetch the complete documentation index at: https://mintlify.com/ps06756/Designing-Data-Intensive-Applications/llms.txt
Use this file to discover all available pages before exploring further.
Introduction
In Chapter 10, we discussed batch processing: running a job on a bounded dataset. Now we explore stream processing: processing unbounded data that arrives continuously.
Stream: Incrementally made available over time
- User activity events on a website
- Sensor readings from IoT devices
- Stock price updates
- Log messages from servers
1. Transmitting Event Streams
An event is a small, self-contained, immutable object containing details of something that happened.
Events are written to a topic or stream, and consumers read from it.
Message Brokers vs Event Logs
Traditional message broker (RabbitMQ):
- Messages deleted after acknowledgment
- Supports complex routing
- Low throughput per topic
Log-based message broker (Kafka):
- Messages retained (configurable)
- Simple sequential reads
- High throughput
Apache Kafka Architecture
Key concepts:
Partition: Ordered, immutable sequence of records
# Kafka producer example
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Send event to partition based on user_id
event = {
'timestamp': '2024-01-15T10:30:00',
'user_id': 12345,
'event_type': 'page_view',
'page': '/products'
}
# Messages with same key go to same partition (ordering guaranteed)
producer.send('user_events', key=str(event['user_id']).encode(), value=event)
Consumer groups: Multiple consumers work together
Log Compaction
Problem: Logs grow forever
Solution: Keep only the latest value for each key
Use case: Maintaining database state in a log
# Example: Changelog for user database
def publish_user_update(user_id, user_data):
"""Publish user update to compacted topic"""
producer.send(
'user_changelog',
key=str(user_id).encode(),
value=user_data
)
# Consumer can rebuild entire user database from log
def rebuild_user_database():
"""Consume compacted log to build local cache"""
consumer = KafkaConsumer('user_changelog')
user_db = {}
for message in consumer:
user_id = message.key.decode()
user_data = json.loads(message.value)
user_db[user_id] = user_data
return user_db
2. Databases and Streams
Key insight: Database can be viewed as a stream of changes.
Change Data Capture (CDC)
CDC: Observe changes written to a database and replicate them to other systems.
Implementation approaches:
Example: Debezium (CDC tool)
Event Sourcing
Event sourcing: Store all changes as immutable events, derive current state.
Benefits:
Example:
# Event sourcing for shopping cart
class ShoppingCartEvent:
pass
class ItemAdded(ShoppingCartEvent):
def __init__(self, item_id, quantity):
self.item_id = item_id
self.quantity = quantity
self.timestamp = datetime.now()
class ItemRemoved(ShoppingCartEvent):
def __init__(self, item_id):
self.item_id = item_id
self.timestamp = datetime.now()
class ShoppingCart:
def __init__(self, events=[]):
self.items = {}
# Rebuild state from events
for event in events:
self.apply(event)
def apply(self, event):
"""Apply event to current state"""
if isinstance(event, ItemAdded):
if event.item_id in self.items:
self.items[event.item_id] += event.quantity
else:
self.items[event.item_id] = event.quantity
elif isinstance(event, ItemRemoved):
self.items.pop(event.item_id, None)
def add_item(self, item_id, quantity):
"""Add item by creating event"""
event = ItemAdded(item_id, quantity)
event_store.append(event) # Persist event
self.apply(event) # Update local state
# Rebuild cart from event history
events = event_store.get_events_for_cart(cart_id)
cart = ShoppingCart(events)
# Time travel: State at specific time
events_until = [e for e in events if e.timestamp < specific_time]
past_cart = ShoppingCart(events_until)
3. Processing Streams
Three main types of stream processing:
Complex Event Processing
Goal: Search for patterns in event streams
Example pattern queries:
-- CEP query language (example: Esper)
SELECT *
FROM LoginEvent.win:time(5 min) AS login,
PurchaseEvent.win:time(5 min) AS purchase
WHERE login.user_id = purchase.user_id
AND login.country != purchase.country
AND purchase.timestamp - login.timestamp < 5 minutes
# Python implementation
from collections import deque
import time
class FraudDetector:
def __init__(self):
self.recent_events = {} # user_id -> deque of events
def process_event(self, event):
user_id = event['user_id']
if user_id not in self.recent_events:
self.recent_events[user_id] = deque()
events = self.recent_events[user_id]
# Add current event
events.append(event)
# Remove events older than 5 minutes
cutoff = time.time() - 300
while events and events[0]['timestamp'] < cutoff:
events.popleft()
# Check for suspicious pattern
if self._is_suspicious(events):
self.raise_alert(user_id, events)
def _is_suspicious(self, events):
"""Check if events match fraud pattern"""
if len(events) < 2:
return False
# Find login and purchase events
logins = [e for e in events if e['type'] == 'login']
purchases = [e for e in events if e['type'] == 'purchase']
for login in logins:
for purchase in purchases:
time_diff = purchase['timestamp'] - login['timestamp']
if (0 < time_diff < 300 and
login['country'] != purchase['country']):
return True
return False
Stream Analytics
Aggregations over time windows
Tumbling window:
Hopping window:
Stream analytics example:
# Apache Flink example: Count page views per minute
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
# Define source (Kafka)
table_env.execute_sql("""
CREATE TABLE page_views (
user_id BIGINT,
page STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'page_views',
'properties.bootstrap.servers' = 'localhost:9092'
)
""")
# Tumbling window aggregation
table_env.execute_sql("""
SELECT
page,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
COUNT(*) as view_count
FROM page_views
GROUP BY
page,
TUMBLE(event_time, INTERVAL '1' MINUTE)
""")
Time in Stream Processing
Challenge: Events may arrive out of order
Two notions of time:
Watermarks: Indicate progress in event time
Handling late events:
# Flink with allowed lateness
table_env.execute_sql("""
SELECT
page,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
COUNT(*) as view_count
FROM page_views
GROUP BY
page,
TUMBLE(event_time, INTERVAL '1' MINUTE)
-- Allow events up to 1 minute late
EMIT AFTER WATERMARK
ALLOWED LATENESS = INTERVAL '1' MINUTE
""")
4. Stream Joins
Joining streams is more complex than joining tables.
Stream-Stream Join (Window Join)
Example:
Implementation:
# Flink stream-stream join
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.time import Time
env = StreamExecutionEnvironment.get_execution_environment()
impressions = env.add_source(...) # Kafka source
clicks = env.add_source(...) # Kafka source
# Join within 10-minute window
joined = impressions \
.key_by(lambda imp: imp['ad_id']) \
.interval_join(clicks.key_by(lambda click: click['ad_id'])) \
.between(Time.minutes(-10), Time.minutes(10)) \
.process(lambda imp, click: {
'ad_id': imp['ad_id'],
'user_id': imp['user_id'],
'impression_time': imp['time'],
'click_time': click['time'],
'time_to_click': click['time'] - imp['time']
})
Stream-Table Join
Join stream events with database table (enrichment).
Challenge: Database table changes over time
Solutions:
Table-Table Join
Both inputs are changelogs (CDC streams from databases).
Example:
5. Fault Tolerance
Stream processors must handle failures gracefully.
Microbatching
Approach: Break stream into small batches (Spark Streaming)
Advantages:
- Can use batch processing techniques
- Easier fault tolerance (batch atomicity)
Disadvantages:
- Higher latency (wait for batch)
- Not true streaming
Checkpointing
Approach: Periodically save complete state (Flink)
Flink checkpointing:
Exactly-once semantics:
Implementing exactly-once:
# Example: Idempotent writes
class IdempotentWriter:
def __init__(self):
self.written_ids = set() # Track written messages
def write(self, message_id, data):
"""Write only if not already written"""
if message_id in self.written_ids:
# Already written, skip (duplicate)
return False
# Write to database/storage
database.write(data)
# Track that we wrote it
self.written_ids.add(message_id)
return True
# Flink two-phase commit for exactly-once
# 1. Pre-commit: Write to temporary location
# 2. Checkpoint barrier arrives
# 3. Commit: Atomically move temp -> final
Idempotence
Idempotent operation: Can be performed multiple times with same effect as once
Making operations idempotent:
# Non-idempotent
def process_order(order_id):
inventory = get_inventory(product_id)
update_inventory(product_id, inventory - 1) # Decrement
# Idempotent version
def process_order_idempotent(order_id):
# Check if already processed
if order_processed(order_id):
return # Skip duplicate
inventory = get_inventory(product_id)
update_inventory(product_id, inventory - 1)
# Mark as processed
mark_order_processed(order_id)
6. Stream Processing Frameworks Comparison
Performance comparison:
Summary
Key Takeaways:
-
Event logs are fundamental:
- Durable, ordered, partitioned
- Kafka pioneered log-based messaging
- Enable replay and multiple consumers
-
Time is complex in streams:
- Event time vs processing time
- Watermarks indicate progress
- Late events require special handling
-
Windowing enables aggregations:
- Tumbling: fixed, non-overlapping
- Hopping: fixed, overlapping
- Sliding: continuous
- Session: based on inactivity
-
Joins are more complex than batch:
- Stream-stream: within time window
- Stream-table: lookup enrichment
- Table-table: maintain materialized view
-
Fault tolerance is critical:
- Exactly-once semantics ideal
- Checkpointing saves state
- Idempotence simplifies recovery
-
Different frameworks, different trade-offs:
- Flink: True streaming, low latency
- Spark: Unified batch/stream, microbatching
- Kafka Streams: Simple, Kafka-native
Comparison table:
| Aspect | Batch Processing | Stream Processing |
|---|
| Input | Bounded (complete dataset) | Unbounded (continuous) |
| Latency | Minutes to hours | Milliseconds to seconds |
| Results | Complete, final | Continuous, approximate |
| State | Materialized to disk | In-memory with checkpoints |
| Time | Processing time only | Event time + processing time |
| Failures | Retry entire job | Checkpoint and replay |
| Use cases | Daily reports, ML training | Fraud detection, monitoring |
Previous: Chapter 10: Batch Processing