Prêt pour la production¶
La suite de l'histoire de TechFlow SARL : du prototype au déploiement
Après avoir réussi la migration des données historiques, l'équipe de TechFlow SARL était prête pour l'étape suivante. Sarah convoqua une réunion de planification technique.
« Notre prototype fonctionne à merveille », commença-t-elle. « Mais avant de passer en production, nous devons nous assurer que notre système est robuste, performant et capable de surmonter n'importe quel incident. »
Lucas, l'architecte systèmes, intervint : « Il nous faut des sauvegardes automatiques, une gestion rigoureuse de la concurrence et une observabilité totale. Sans oublier l'optimisation des performances. »
« Exactement », acquiesça Sarah. « Commençons par le commencement. »
Configuration du BackupManager¶
« La règle d'or en production : ne jamais faire l'impasse sur les sauvegardes », rappela Lucas.
Configuration de base¶
from pathlib import Path
from dictdb import DictDB, BackupManager
# Initialiser la base de données
db = DictDB()
db.create_table("users", primary_key="id")
db.create_table("sessions", primary_key="session_id")
db.create_table("events", primary_key="event_id")
# Configurer le répertoire de sauvegarde
BACKUP_DIR = Path("./data/backups")
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
# Créer le gestionnaire de sauvegardes
backup_manager = BackupManager(
db=db,
backup_dir=BACKUP_DIR,
backup_interval=300, # Sauvegarde toutes les 5 minutes
file_format="json", # Format lisible pour faciliter le débogage
min_backup_interval=10.0, # Minimum 10s entre deux sauvegardes déclenchées
)
# Démarrer les sauvegardes automatiques
backup_manager.start()
print("Gestionnaire de sauvegardes démarré")
# ... L'application tourne ...
# À la fermeture de l'application
backup_manager.backup_full() # Une dernière sauvegarde complète
backup_manager.stop()
print("Gestionnaire de sauvegardes arrêté proprement")
Gestion des échecs de sauvegarde¶
« Et si une sauvegarde échoue ? », demanda Thomas.
import logging
from dictdb import DictDB, BackupManager
# Configuration standard du logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("techflow.backup")
def on_backup_failure(error: Exception, consecutive_failures: int):
"""
Callback appelé en cas d'échec de sauvegarde.
"""
logger.error(f"Échec de sauvegarde ({consecutive_failures}x) : {error}")
# Alertes progressives selon la gravité
if consecutive_failures == 1:
logger.warning("Premier échec - surveillance active")
elif consecutive_failures == 3:
logger.error("3 échecs consécutifs - vérifiez l'espace disque")
# envoyer_email_admin("Alerte Sauvegardes", str(error))
elif consecutive_failures >= 5:
logger.critical("CRITIQUE : 5+ échecs - intervention immédiate requise !")
# envoyer_sms_astreinte("Sauvegardes en échec critique")
db = DictDB()
db.create_table("donnees_critiques")
backup_manager = BackupManager(
db=db,
backup_dir="./backups",
backup_interval=60,
on_backup_failure=on_backup_failure,
)
backup_manager.start()
# Surveiller l'état
print(f"Échecs consécutifs : {backup_manager.consecutive_failures}")
Sauvegardes incrémentales¶
« Les sauvegardes complètes vont devenir trop lourdes avec des millions d'enregistrements », nota Lucas. « Passons au mode incrémental. »
from dictdb import DictDB, BackupManager
db = DictDB()
db.create_table("transactions", primary_key="tx_id")
# Mode incrémental : on ne sauvegarde que les changements
backup_manager = BackupManager(
db=db,
backup_dir="./backups",
backup_interval=60, # Chaque minute
incremental=True, # Activer le mode incrémental
max_deltas_before_full=10, # Une sauvegarde complète tous les 10 deltas
)
backup_manager.start()
# Insérer des données
transactions = db.get_table("transactions")
for i in range(100):
transactions.insert({"tx_id": f"TX{i:05d}", "amount": i * 10.0})
# Déclencher une sauvegarde incrémentale après un gros lot
backup_manager.notify_change()
# Vérifier le nombre de deltas depuis la dernière sauvegarde complète
print(f"Deltas accumulés : {backup_manager.deltas_since_full}")
# Forcer une sauvegarde complète (compactage)
backup_manager.backup_full()
print(f"Deltas après compactage : {backup_manager.deltas_since_full}") # 0
Structure des fichiers de sauvegarde¶
backups/
dictdb_backup_1706123456_789012.json # Sauvegarde complète
dictdb_delta_1706123516_123456.json # Delta 1
dictdb_delta_1706123576_234567.json # Delta 2
dictdb_delta_1706123636_345678.json # Delta 3
dictdb_backup_1706123696_456789.json # Nouvelle sauvegarde complète (compactage)
Restauration à partir de deltas¶
from pathlib import Path
from dictdb import DictDB
from dictdb.storage.persist import apply_delta
BACKUP_DIR = Path("./backups")
def restaurer_base():
"""Restaure la base de données à partir des sauvegardes."""
# Trouver la dernière sauvegarde complète
full_backups = sorted(BACKUP_DIR.glob("dictdb_backup_*.json"))
if not full_backups:
raise FileNotFoundError("Aucune sauvegarde complète trouvée")
latest_full = full_backups[-1]
print(f"Chargement de la base complète : {latest_full.name}")
# Charger la base complète
db = DictDB.load(str(latest_full), "json")
# Extraire le timestamp de la sauvegarde
backup_timestamp = latest_full.stem.replace("dictdb_backup_", "")
# Appliquer les deltas suivants
deltas = sorted(BACKUP_DIR.glob("dictdb_delta_*.json"))
for delta_file in deltas:
delta_timestamp = delta_file.stem.replace("dictdb_delta_", "")
if delta_timestamp > backup_timestamp:
affected = apply_delta(db, delta_file)
print(f"Delta appliqué : {delta_file.name} ({affected} records)")
return db
# Exécuter la restauration
ma_base = restaurer_base()
print(f"Base restaurée : {len(ma_base.list_tables())} tables")
Gestion de la concurrence (Thread-Safety)¶
« Notre application va recevoir des requêtes de plusieurs threads en même temps », expliqua Sarah. « Comment s'assurer que tout reste cohérent ? »
Lectures concurrentes¶
import threading
from dictdb import DictDB
db = DictDB()
db.create_table("products")
products = db.get_table("products")
# Remplissage initial
for i in range(1000):
products.insert({"name": f"Produit {i}", "price": i * 9.99, "stock": i % 100})
def lecteur_thread(thread_id: int):
"""Fonction exécutée par chaque thread lecteur."""
# Plusieurs threads peuvent lire en même temps sans se bloquer
results = products.select(
where=products.price > 500,
order_by="-price",
limit=10,
)
print(f"Thread {thread_id} : trouvé {len(results)} produits chers")
# Lancer 10 threads de lecture en parallèle
threads = [threading.Thread(target=lecteur_thread, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print("Toutes les lectures sont terminées")
Écritures concurrentes¶
import threading
import time
from dictdb import DictDB
db = DictDB()
db.create_table("counters", primary_key="name")
counters = db.get_table("counters")
# Initialiser un compteur
counters.insert({"name": "visites", "value": 0})
def incrementer_compteur_safe(db_instance: DictDB, iterations: int):
"""
Version sûre utilisant des copies pour éviter les effets de bord.
"""
table = db_instance.get_table("counters")
for _ in range(iterations):
# Lire et mettre à jour de manière atomique
# Utiliser copy=True pour avoir ses propres données
current = table.select(where=table.name == "visites", copy=True)
if current:
new_value = current[0]["value"] + 1
table.update({"value": new_value}, where=table.name == "visites")
Opérations asynchrones¶
« Pour notre API web, nous ne pouvons pas nous permettre de bloquer sur les entrées/sorties », nota Thomas.
import asyncio
from dictdb import DictDB
async def main():
# Créer et remplir la base
db = DictDB()
db.create_table("users")
users = db.get_table("users")
for i in range(1000):
users.insert({"name": f"User{i}", "email": f"user{i}@example.com"})
# Sauvegarde asynchrone - ne bloque pas la boucle d'événements
print("Démarrage de la sauvegarde asynchrone...")
await db.async_save("backup_async.json", "json")
print("Sauvegarde terminée")
# Chargement asynchrone
print("Chargement asynchrone...")
db_loaded = await DictDB.async_load("backup_async.json", "json")
print(f"Chargé {db_loaded.get_table('users').count()} utilisateurs")
# Dans une application asynchrone (FastAPI, aiohttp, etc.)
asyncio.run(main())
Configuration avancée des logs¶
« L'observabilité est cruciale », déclara Lucas. « Configurons nos logs pour la production. »
from dictdb import configure_logging
# Configuration de production :
# - Niveau INFO pour la console (erreurs visibles)
# - Tous les logs dans un fichier
# - Format JSON pour l'agrégation (ELK, Splunk, etc.)
configure_logging(
level="INFO",
console=True,
logfile="./logs/dictdb.log",
json=True,
)
Échantillonnage des logs DEBUG¶
from dictdb import configure_logging
# En production avec beaucoup de trafic, échantillonner les logs DEBUG
# pour ne pas noyer les logs importants
configure_logging(
level="DEBUG",
console=True,
logfile="./logs/dictdb_debug.log",
sample_debug_every=100, # On ne logue qu'un DEBUG sur 100
)
Meilleures pratiques de gestion des erreurs¶
« En production, les erreurs sont inévitables », philosopha Sarah. « Ce qui compte, c'est de savoir les gérer proprement. »
from dictdb import (
DictDB,
DictDBError,
DuplicateKeyError,
RecordNotFoundError,
SchemaValidationError,
TableNotFoundError,
)
def executer_operation_robuste(db: DictDB, operation: str, **kwargs):
"""
Exécute une opération avec une gestion complète des erreurs.
"""
try:
table = db.get_table(kwargs["table"])
if operation == "insert":
return table.insert(kwargs["record"])
elif operation == "update":
return table.update(kwargs["changes"], where=kwargs.get("where"))
elif operation == "delete":
return table.delete(where=kwargs.get("where"))
except TableNotFoundError:
print(f"Erreur : Table '{kwargs['table']}' introuvable")
raise
except DuplicateKeyError:
print("Erreur : Cette clé existe déjà")
# Possibilité d'utiliser upsert ici
raise
except SchemaValidationError as e:
print(f"Erreur de validation : {e}")
raise
except RecordNotFoundError:
print("Avertissement : Aucun enregistrement trouvé")
return 0
except DictDBError as e:
print(f"Erreur DictDB : {e}")
raise
except Exception as e:
print(f"Erreur inattendue : {e}")
raise
Conseils de performance¶
« Pour finir, optimisons la vitesse », conclut Lucas.
1. Utiliser les index à bon escient¶
# Index Hash pour les recherches d'égalité exacte -> O(1)
transactions.create_index("customer_id", index_type="hash")
# Index trié pour les plages de valeurs et le tri -> O(log n)
transactions.create_index("amount", index_type="sorted")
2. Désactiver la copie pour les lectures seules¶
# Beaucoup plus rapide pour les gros volumes, mais attention :
# INTERDICTION de modifier les résultats !
results = logs.select(limit=10000, copy=False)
3. Insertions par lots (Batch)¶
# Pratique recommandée : insertion d'une liste entière
events.insert(liste_de_10000_evenements)
# Avec une taille de lot explicite pour les très gros volumes
events.insert(gros_volume, batch_size=1000)
4. Ignorer la validation pour les données sûres¶
# Si les données viennent d'une source interne déjà validée
table.insert(donnees_de_confiance, skip_validation=True)
Ce que nous avons appris¶
Grâce à cette préparation minutieuse, l'équipe de TechFlow SARL maîtrise désormais :
- BackupManager : Configurer des sauvegardes périodiques avec surveillance et alertes.
- Sauvegardes incrémentales : Économiser des E/S en n'enregistrant que les deltas.
- Thread-safety : Utiliser correctement le modèle de concurrence de DictDB.
- Opérations asynchrones : Intégrer la base de données dans une boucle d'événements non bloquante.
- Logging de production : Configurer des logs structurés (JSON) et échantillonnés.
- Gestion des erreurs : Implémenter des blocs try/except robustes pour chaque type d'exception.
- Optimisations : Utiliser les index, désactiver les copies inutiles et privilégier les traitements par lots.
« Notre application est fin prête pour le monde réel », conclut Sarah. « Nous avons la robustesse, la visibilité et la vitesse nécessaires pour réussir notre lancement. »
Fin de l'épopée TechFlow SARL. Que vos bases de données soient toujours sauvegardées et vos index judicieusement choisis !