Production Ready¶
The TechFlow Inc. story continues: from prototype to deployment
After successfully completing the legacy data migration, the team at TechFlow Inc. was ready for the next step. Sarah called a technical planning meeting.
"Our prototype works perfectly," she said. "But before we deploy to production, we need to ensure it's robust, performant, and capable of handling incidents."
Lucas, the systems architect, spoke up. "We need automatic backups, proper concurrency management, and complete observability. Not to mention performance optimization."
"Exactly," Sarah agreed. "Let's start from the beginning."
BackupManager Setup¶
"The first rule of production: always have backups," Lucas reminded everyone.
Basic Configuration¶
from pathlib import Path
from dictdb import DictDB, BackupManager
# Initialize the database
db = DictDB()
db.create_table("users", primary_key="id")
db.create_table("sessions", primary_key="session_id")
db.create_table("events", primary_key="event_id")
# Configure backup directory
BACKUP_DIR = Path("./data/backups")
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
# Create the backup manager
backup_manager = BackupManager(
db=db,
backup_dir=BACKUP_DIR,
backup_interval=300, # Backup every 5 minutes
file_format="json", # Human-readable format for debugging
min_backup_interval=10.0, # Minimum 10s between triggered backups
)
# Start automatic backups
backup_manager.start()
print("Backup manager started")
# ... Application running ...
# On application shutdown
backup_manager.backup_full() # Final complete backup
backup_manager.stop()
print("Backup manager stopped gracefully")
Handling Backup Failures¶
"What happens if a backup fails?" Tom asked.
import logging
from dictdb import DictDB, BackupManager
# Standard logging configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("techflow.backup")
def on_backup_failure(error: Exception, consecutive_failures: int):
"""
Callback invoked when a backup fails.
:param error: The exception that caused the failure
:param consecutive_failures: Number of consecutive failures
"""
logger.error(f"Backup failure ({consecutive_failures}x): {error}")
# Progressive alerts based on severity
if consecutive_failures == 1:
logger.warning("First backup failure - monitoring active")
elif consecutive_failures == 3:
logger.error("3 consecutive failures - check disk space")
# send_admin_email("Backup Alerts", str(error))
elif consecutive_failures >= 5:
logger.critical("CRITICAL: 5+ failures - intervention required!")
# send_oncall_sms("Backups in critical failure")
db = DictDB()
db.create_table("critical_data")
backup_manager = BackupManager(
db=db,
backup_dir="./backups",
backup_interval=60,
on_backup_failure=on_backup_failure,
)
backup_manager.start()
# Monitor status
print(f"Consecutive failures: {backup_manager.consecutive_failures}")
Incremental Backups¶
"Full backups take too long with millions of records," Lucas noted. "Let's switch to incremental backups."
from dictdb import DictDB, BackupManager
db = DictDB()
db.create_table("transactions", primary_key="tx_id")
# Incremental mode: saves only changes
backup_manager = BackupManager(
db=db,
backup_dir="./backups",
backup_interval=60, # Every minute
incremental=True, # Enable incremental mode
max_deltas_before_full=10, # Full backup every 10 deltas
)
backup_manager.start()
# Insert data
transactions = db.get_table("transactions")
for i in range(100):
transactions.insert({"tx_id": f"TX{i:05d}", "amount": i * 10.0})
# Trigger an incremental backup after a large batch
backup_manager.notify_change()
# Check the number of deltas since last full backup
print(f"Deltas since last full backup: {backup_manager.deltas_since_full}")
# Force a full backup (compaction)
backup_manager.backup_full()
print(f"Deltas after compaction: {backup_manager.deltas_since_full}") # 0
Backup File Structure¶
backups/
dictdb_backup_1706123456_789012.json # Full backup
dictdb_delta_1706123516_123456.json # Delta 1
dictdb_delta_1706123576_234567.json # Delta 2
dictdb_delta_1706123636_345678.json # Delta 3
dictdb_backup_1706123696_456789.json # New full backup (compaction)
Restoring from Incremental Backups¶
from pathlib import Path
from dictdb import DictDB
from dictdb.storage.persist import apply_delta
BACKUP_DIR = Path("./backups")
def restore_database():
"""Restore the database from backups."""
# Find the latest full backup
full_backups = sorted(BACKUP_DIR.glob("dictdb_backup_*.json"))
if not full_backups:
raise FileNotFoundError("No full backup found")
latest_full = full_backups[-1]
print(f"Loading full backup: {latest_full.name}")
# Load the full backup
db = DictDB.load(str(latest_full), "json")
# Extract the timestamp from the full backup
# Format: dictdb_backup_1706123456_789012.json
backup_timestamp = latest_full.stem.replace("dictdb_backup_", "")
# Apply subsequent deltas
deltas = sorted(BACKUP_DIR.glob("dictdb_delta_*.json"))
for delta_file in deltas:
delta_timestamp = delta_file.stem.replace("dictdb_delta_", "")
if delta_timestamp > backup_timestamp:
affected = apply_delta(db, delta_file)
print(f"Delta applied: {delta_file.name} ({affected} records)")
return db
# Restore
restored_db = restore_database()
print(f"Database restored: {len(restored_db.list_tables())} tables")
Thread-Safety Patterns¶
"Our application will receive requests from multiple threads simultaneously," Sarah explained. "How do we handle that?"
Concurrent Reads¶
import threading
from dictdb import DictDB
db = DictDB()
db.create_table("products")
products = db.get_table("products")
# Populate with data
for i in range(1000):
products.insert({"name": f"Product {i}", "price": i * 9.99, "stock": i % 100})
def reader_thread(thread_id: int):
"""Function executed by each reader thread."""
# Multiple threads can read simultaneously
results = products.select(
where=products.price > 500,
order_by="-price",
limit=10,
)
print(f"Thread {thread_id}: found {len(results)} expensive products")
# Launch 10 reader threads in parallel
threads = [threading.Thread(target=reader_thread, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print("All reads completed")
Concurrent Writes¶
import threading
import time
from dictdb import DictDB, DuplicateKeyError
db = DictDB()
db.create_table("counters", primary_key="name")
counters = db.get_table("counters")
# Initialize a counter
counters.insert({"name": "visits", "value": 0})
def increment_counter(thread_id: int, iterations: int):
"""Increment counter in a thread-safe manner."""
for _ in range(iterations):
# The update is atomic
counters.update(
{"value": counters.select(where=counters.name == "visits")[0]["value"] + 1},
where=counters.name == "visits",
)
# Note: This approach has a race condition problem!
# Let's see the correct method...
def increment_counter_safe(db_instance: DictDB, thread_id: int, iterations: int):
"""
Thread-safe version using upsert to avoid race conditions.
For counters, prefer an atomic structure.
"""
counters = db_instance.get_table("counters")
for _ in range(iterations):
# Read and update atomically
# Use copy=True to avoid modifications to shared references
current = counters.select(where=counters.name == "visits", copy=True)
if current:
new_value = current[0]["value"] + 1
counters.update({"value": new_value}, where=counters.name == "visits")
Producer-Consumer Pattern¶
import threading
import queue
import time
from dictdb import DictDB
db = DictDB()
db.create_table("tasks", primary_key="id")
db.create_table("results", primary_key="task_id")
tasks = db.get_table("tasks")
results = db.get_table("results")
# Queue for coordination
task_queue = queue.Queue()
shutdown = threading.Event()
def producer():
"""Create tasks and add them to the queue."""
for i in range(100):
task_id = tasks.insert({"id": i, "status": "pending", "data": f"task_{i}"})
task_queue.put(task_id)
time.sleep(0.01) # Simulate delay between tasks
# Signal completion
shutdown.set()
print("Producer finished")
def consumer(worker_id: int):
"""Process tasks from the queue."""
while not shutdown.is_set() or not task_queue.empty():
try:
task_id = task_queue.get(timeout=0.1)
except queue.Empty:
continue
# Update status (atomic)
tasks.update({"status": "in_progress"}, where=tasks.id == task_id)
# Simulate processing
time.sleep(0.05)
# Record the result
results.insert({
"task_id": task_id,
"worker": worker_id,
"result": "success",
})
# Mark as completed
tasks.update({"status": "completed"}, where=tasks.id == task_id)
task_queue.task_done()
print(f"Consumer {worker_id} finished")
# Start threads
prod = threading.Thread(target=producer)
consumers = [threading.Thread(target=consumer, args=(i,)) for i in range(3)]
prod.start()
for c in consumers:
c.start()
prod.join()
for c in consumers:
c.join()
print(f"Tasks processed: {results.count()}")
Async Save/Load Operations¶
"For our async web API, we cannot block on I/O," Tom noted.
import asyncio
from dictdb import DictDB
async def main():
# Create and populate the database
db = DictDB()
db.create_table("users")
users = db.get_table("users")
for i in range(1000):
users.insert({"name": f"User{i}", "email": f"user{i}@example.com"})
# Async save - does not block the event loop
print("Starting async save...")
await db.async_save("backup_async.json", "json")
print("Save completed")
# Async load
print("Async loading...")
db_loaded = await DictDB.async_load("backup_async.json", "json")
print(f"Loaded {db_loaded.get_table('users').count()} users")
# In an async application (FastAPI, aiohttp, etc.)
asyncio.run(main())
FastAPI Integration¶
from fastapi import FastAPI, BackgroundTasks
from dictdb import DictDB, BackupManager
app = FastAPI()
db = DictDB()
db.create_table("api_logs", primary_key="id")
logs = db.get_table("api_logs")
# BackupManager in the background
backup_manager = BackupManager(db, "./backups", backup_interval=60, incremental=True)
@app.on_event("startup")
async def startup():
backup_manager.start()
@app.on_event("shutdown")
async def shutdown():
backup_manager.backup_full()
backup_manager.stop()
@app.post("/log")
async def create_log(message: str, background_tasks: BackgroundTasks):
log_id = logs.insert({"message": message, "timestamp": "2024-01-20T10:00:00"})
# Save in background after insertion
background_tasks.add_task(backup_manager.notify_change)
return {"id": log_id}
@app.get("/backup/status")
async def backup_status():
return {
"consecutive_failures": backup_manager.consecutive_failures,
"deltas_since_full": backup_manager.deltas_since_full,
}
Logging Configuration¶
"Observability is crucial in production," Lucas stated. "Let's configure logging properly."
Basic Configuration¶
from dictdb import DictDB, configure_logging
# Simple configuration: logs to stdout at INFO level
configure_logging(level="INFO", console=True)
db = DictDB() # Will print: "Initialized an empty DictDB instance."
db.create_table("test") # Will print: "Created table 'test' (pk='id')."
Advanced Configuration for Production¶
from dictdb import configure_logging
# Production configuration:
# - INFO level for console (errors visible)
# - All logs to a file
# - JSON format for aggregation
configure_logging(
level="INFO",
console=True,
logfile="./logs/dictdb.log",
json=True, # JSON format for ELK/Splunk/etc.
)
DEBUG Log Sampling¶
from dictdb import configure_logging
# In production with high traffic, sample DEBUG logs
# to avoid drowning important logs
configure_logging(
level="DEBUG",
console=True,
logfile="./logs/dictdb_debug.log",
sample_debug_every=100, # Only log 1 DEBUG out of 100
)
Custom Logging with Filters¶
from dictdb import logger
# Remove default handlers
logger.remove()
# Add a handler for stdout with filter
logger.add(
sink="./logs/operations.log",
level="INFO",
filter=lambda record: record["extra"].get("op") in ("INSERT", "UPDATE", "DELETE"),
)
# Add a separate handler for errors
logger.add(
sink="./logs/errors.log",
level="ERROR",
)
# Logs will be automatically routed to the appropriate file
Error Handling Best Practices¶
"In production, errors happen," Sarah philosophized. "What matters is handling them correctly."
from dictdb import (
DictDB,
DictDBError,
DuplicateKeyError,
DuplicateTableError,
RecordNotFoundError,
SchemaValidationError,
TableNotFoundError,
)
def handle_operation_robustly(db: DictDB, operation: str, **kwargs):
"""
Execute an operation with complete error handling.
"""
try:
if operation == "insert":
table = db.get_table(kwargs["table"])
return table.insert(kwargs["record"])
elif operation == "update":
table = db.get_table(kwargs["table"])
return table.update(kwargs["changes"], where=kwargs.get("where"))
elif operation == "delete":
table = db.get_table(kwargs["table"])
return table.delete(where=kwargs.get("where"))
except TableNotFoundError as e:
# Table doesn't exist - maybe create it automatically?
print(f"Table not found: {e}")
raise
except DuplicateKeyError as e:
# Primary key already exists
print(f"Duplicate detected: {e}")
# Option: use upsert instead
raise
except SchemaValidationError as e:
# Data doesn't match the schema
print(f"Validation error: {e}")
raise
except RecordNotFoundError as e:
# No record matches the criteria
print(f"Record not found: {e}")
return 0 # Not a fatal error, just no modifications
except DictDBError as e:
# Generic DictDB error
print(f"DictDB error: {e}")
raise
except Exception as e:
# Unexpected error
print(f"Unexpected error: {type(e).__name__}: {e}")
raise
# Usage
db = DictDB()
db.create_table("products", primary_key="sku")
try:
handle_operation_robustly(
db,
"insert",
table="products",
record={"sku": "ABC123", "name": "Widget", "price": 29.99},
)
except DuplicateKeyError:
# Handle the duplicate
pass
Retry Pattern with Exponential Backoff¶
import time
import random
from functools import wraps
from dictdb import DictDBError
def with_retry(max_attempts: int = 3, backoff_base: float = 1.0):
"""Decorator to retry an operation on failure."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except DictDBError as e:
last_exception = e
if attempt < max_attempts - 1:
# Exponential backoff with jitter
delay = backoff_base * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
@with_retry(max_attempts=3)
def insert_with_retry(table, record):
"""Insert a record with automatic retry."""
return table.insert(record)
Performance Tips¶
"Now, let's optimize for performance," Lucas declared.
1. Use Indexes Wisely¶
from dictdb import DictDB
db = DictDB()
db.create_table("transactions", primary_key="id")
transactions = db.get_table("transactions")
# Create indexes on frequently queried columns
transactions.create_index("customer_id", index_type="hash") # O(1) for equality
transactions.create_index("amount", index_type="sorted") # O(log n) for ranges
transactions.create_index("date", index_type="sorted") # For ORDER BY
# Populate with data
for i in range(100000):
transactions.insert({
"customer_id": i % 1000,
"amount": i * 0.99,
"date": f"2024-01-{(i % 28) + 1:02d}",
"status": "active" if i % 2 == 0 else "inactive",
})
# Fast query using hash index on customer_id
import time
start = time.time()
results = transactions.select(where=transactions.customer_id == 42)
print(f"Search by customer_id: {len(results)} results in {time.time() - start:.4f}s")
# Range query using sorted index on amount
start = time.time()
results = transactions.select(where=transactions.amount.between(1000, 2000))
print(f"Search by amount range: {len(results)} results in {time.time() - start:.4f}s")
2. Use copy=False for Read-Only Operations¶
from dictdb import DictDB
db = DictDB()
db.create_table("logs", primary_key="id")
logs = db.get_table("logs")
# Populate
for i in range(50000):
logs.insert({"message": f"Log entry {i}", "level": "INFO"})
import time
# With copy (default, safer)
start = time.time()
results_copied = logs.select(limit=10000, copy=True)
print(f"With copy=True: {time.time() - start:.4f}s")
# Without copy (faster, read-only)
start = time.time()
results_refs = logs.select(limit=10000, copy=False)
print(f"With copy=False: {time.time() - start:.4f}s")
# WARNING: never modify results with copy=False!
# results_refs[0]["message"] = "Modified" # DANGEROUS!
3. Batch Inserts¶
from dictdb import DictDB
import time
db = DictDB()
db.create_table("events", primary_key="id")
events = db.get_table("events")
# Bad practice: individual inserts
data = [{"type": "click", "page": f"/page/{i}"} for i in range(10000)]
start = time.time()
for record in data:
events.insert(record)
print(f"Individual inserts: {time.time() - start:.4f}s")
# Reset
events.delete()
# Good practice: batch insert
start = time.time()
events.insert(data) # Single operation
print(f"Batch insert: {time.time() - start:.4f}s")
# With explicit batches for very large volumes
start = time.time()
events.delete()
events.insert(data, batch_size=1000) # Process in batches of 1000
print(f"Batch insert with batch_size: {time.time() - start:.4f}s")
4. Use skip_validation for Trusted Data¶
from dictdb import DictDB
db = DictDB()
db.create_table("imported_data", primary_key="id")
data_table = db.get_table("imported_data")
# Data from a trusted source (internal API, validated file, etc.)
trusted_data = [
{"id": i, "value": i * 10}
for i in range(100000)
]
import time
# With validation (safer, slower)
start = time.time()
data_table.insert(trusted_data)
print(f"With validation: {time.time() - start:.4f}s")
data_table.delete()
# Without validation (faster, only for trusted data!)
start = time.time()
data_table.insert(trusted_data, skip_validation=True)
print(f"Without validation: {time.time() - start:.4f}s")
5. Limit Results with LIMIT and Optimized ORDER BY¶
from dictdb import DictDB
db = DictDB()
db.create_table("scores", primary_key="id")
scores = db.get_table("scores")
# Index on the sort field
scores.create_index("points", index_type="sorted")
# Populate
for i in range(100000):
scores.insert({"player": f"Player{i}", "points": i * 7 % 10000})
import time
# Get top 10 - DictDB automatically optimizes with heapq
start = time.time()
top10 = scores.select(
order_by="-points", # Descending sort
limit=10,
)
print(f"Top 10 in {time.time() - start:.4f}s")
for i, score in enumerate(top10, 1):
print(f" {i}. {score['player']}: {score['points']} points")
Complete Example: Production-Ready Application¶
Here's a complete example integrating all best practices:
"""
Production-ready DictDB application.
"""
import asyncio
import threading
import time
from pathlib import Path
from contextlib import contextmanager
from dictdb import (
DictDB,
BackupManager,
configure_logging,
DictDBError,
RecordNotFoundError,
)
class ProductionApp:
"""DictDB application configured for production."""
def __init__(self, data_dir: str = "./data"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
# Configure logging
configure_logging(
level="INFO",
console=True,
logfile=str(self.data_dir / "app.log"),
json=False,
)
# Initialize or load database
self.db = self._initialize_or_load()
# Configure backup manager
self.backup_manager = BackupManager(
db=self.db,
backup_dir=self.data_dir / "backups",
backup_interval=300,
file_format="json",
min_backup_interval=30.0,
on_backup_failure=self._on_backup_failure,
incremental=True,
max_deltas_before_full=20,
)
# Statistics
self._stats = {
"operations": 0,
"errors": 0,
"started_at": time.time(),
}
def _initialize_or_load(self) -> DictDB:
"""Load an existing backup or create a new database."""
backup_dir = self.data_dir / "backups"
if backup_dir.exists():
backups = sorted(backup_dir.glob("dictdb_backup_*.json"))
if backups:
latest = backups[-1]
print(f"Loading backup: {latest.name}")
return DictDB.load(str(latest), "json")
# Create a new database with required tables
print("Creating new database")
db = DictDB()
db.create_table("users", primary_key="id")
db.create_table("sessions", primary_key="session_id")
db.create_table("logs", primary_key="id")
# Create indexes for frequent queries
users = db.get_table("users")
users.create_index("email", index_type="hash")
sessions = db.get_table("sessions")
sessions.create_index("user_id", index_type="hash")
sessions.create_index("expire_at", index_type="sorted")
return db
def _on_backup_failure(self, error: Exception, count: int):
"""Callback for backup failures."""
self._stats["errors"] += 1
print(f"ALERT: Backup failure ({count}x): {error}")
if count >= 5:
print("CRITICAL: Multiple backup failures!")
# Here: send alert to operations
def start(self):
"""Start the application."""
print("Starting application...")
self.backup_manager.start()
print("Application started")
def stop(self):
"""Stop the application gracefully."""
print("Stopping application...")
# Final backup
self.backup_manager.backup_full()
self.backup_manager.stop()
# Save final state
self.db.save(str(self.data_dir / "final_state.json"), "json")
print("Application stopped gracefully")
self._display_stats()
def _display_stats(self):
"""Display application statistics."""
duration = time.time() - self._stats["started_at"]
print(f"\n--- Statistics ---")
print(f"Runtime: {duration:.1f}s")
print(f"Operations performed: {self._stats['operations']}")
print(f"Errors: {self._stats['errors']}")
print(f"Backup failures: {self.backup_manager.consecutive_failures}")
@contextmanager
def operation(self, name: str):
"""Context manager to trace operations."""
start = time.time()
try:
yield
self._stats["operations"] += 1
except DictDBError as e:
self._stats["errors"] += 1
print(f"Error in {name}: {e}")
raise
finally:
duration = time.time() - start
if duration > 0.1: # Log if over 100ms
print(f"Slow operation: {name} ({duration:.3f}s)")
def create_user(self, email: str, name: str) -> int:
"""Create a new user."""
with self.operation("create_user"):
users = self.db.get_table("users")
return users.insert({
"email": email,
"name": name,
"created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
})
def get_user(self, user_id: int) -> dict:
"""Retrieve a user by ID."""
with self.operation("get_user"):
users = self.db.get_table("users")
results = users.select(
where=users.id == user_id,
copy=True, # Safe copy
)
if not results:
raise RecordNotFoundError(f"User {user_id} not found")
return results[0]
def search_by_email(self, email: str) -> list:
"""Search for a user by email (uses index)."""
with self.operation("search_by_email"):
users = self.db.get_table("users")
return users.select(where=users.email == email)
# Entry point
def main():
app = ProductionApp("./production_data")
try:
app.start()
# Simulate operations
for i in range(100):
user_id = app.create_user(
email=f"user{i}@example.com",
name=f"User {i}",
)
# Retrieve the user
user = app.get_user(user_id)
# Search by email (uses index)
results = app.search_by_email(user["email"])
if i % 10 == 0:
print(f"Progress: {i}/100 users created")
print("Operations completed successfully")
except KeyboardInterrupt:
print("\nInterrupt requested...")
finally:
app.stop()
if __name__ == "__main__":
main()
What We Learned¶
By the end of this production preparation, the TechFlow Inc. team now masters:
-
BackupManager: Configure periodic backups with failure callbacks and status monitoring.
-
Incremental backups: Reduce I/O by saving only changes, with automatic compaction after N deltas.
-
Thread-safety: Understand DictDB's concurrency model (reader-writer lock per table) and use it correctly in multi-threaded applications.
-
Async operations: Use
async_save()andasync_load()to avoid blocking the event loop in asynchronous applications. -
Logging: Configure log level, outputs (console, file), JSON format for aggregation, and DEBUG log sampling.
-
Error handling: Catch specific DictDB exceptions and implement retry patterns with exponential backoff.
-
Performance optimizations:
- Hash indexes for equality searches
- Sorted indexes for ranges and sorting
copy=Falsefor read-only operations- Batch inserts instead of individual inserts
skip_validation=Truefor trusted datalimitandorder_byoptimized with heapq
"Our application is now ready to face production," Sarah concluded with satisfaction. "We have robust backups, proper concurrency handling, and logs to diagnose issues."
Lucas nodded. "And we have the optimizations needed to handle scale. TechFlow Inc. can deploy with confidence."
End of the TechFlow Inc. saga. May your databases always be backed up and your indexes well chosen.