Dès qu'une application web a besoin de déléguer une tâche lourde ou critique en arrière-plan, on se retrouve avec Redis pour stocker le résultat des tâches et parfois RabbitMQ pour gérer la messagerie. La mode est maintenant d'utiliser Redis aussi pour la messagerie, grâce à une procédure stockée en lua, ça économise le gros bousin qu'est RabbitMQ.
Postgres peut déjà remplacer Redis grâce à JSONb. Mais pour notifier, va-t-on
bourriner Postgres avec des SELECT
? Heureusement non, grâce à
LISTEN
/NOTIFY
!
LISTEN
/NOTIFY
est une extension au langage SQL pour communiquer une
information aux autres sessions ouvertes. Postgres distingue plusieurs canaux de
notification, auquel on s'abonne. Couplé avec l'appel système select, on peut
rester très efficace. Voilà à quoi ça ressemble :
Je vous propose de détailler un peu comment utiliser Postgres pour communiquer entre frontal et dorsal.
Pousser un message dans la queue
La queue des messages est une simple table où l'on stocke le message, un
identifiant et l'état du message. Une fois que le message est inséré, le frontal
notifie les dorsaux avec la commande NOTIFY
et deux paramètres : le canal et
l'identifiant du message.
INSERT INTO messages(message, state)
VALUES ('message', 'queued')
RETURNING id;
NOTIFY "queue", id;
Reste à faire quelque chose de ce message !
Consommer
Côté dorsal, il faut ouvrir une connexion permanente qui écoute les notifications, sur le même canal où le frontal émet.
LISTEN queue;
Pour utiliser ça, oubliez votre ORM. La documentation de psycopg2 propose un exemple de récupération des notifications asynchrones en Python. asyncpg implémente également cette partie du protocole.
Une fois que le dorsal a l'information, consommer le message est plus délicat.
Le message ne doit être traité qu'une fois ! Grâce à l'intégrité de Postgres, il
n'y a rien d'obscur à faire. Lors d'un UPDATE
la table est correctement
verrouillée. L'astuce est de bien préciser à Postgres de mettre à jour l'état du
message, uniquement si ce n'est pas déjà fait !
UPDATE messages
SET state = 'consumed'
WHERE id = 123 AND state = 'queued'
RETURNING message;
La clause state = 'queued'
s'assure que Postgres ne touche pas un message déjà
consommé par un autre dorsal. Postgres retourne le nombre de lignes modifiées
par une requête. Si c'est 1, le message était à queued
il faut donc le
traiter ! Si c'est 0, un autre dorsal a déjà consommé le message, il faut passer
au suivant.
La clause RETURNING message
est une sorte de SELECT
caché dans l'UPDATE
.
Postgres retourne le contenu message 123 s'il est queued
. Il ne reste qu'à
ouvrir le courrier !
Je trouve ça plus simple que d'envoyer du lua dans Redis… Merci Postgres !
Dernier point, pour récupérer les messages envoyés avant LISTEN
, un simple
SELECT WHERE state = 'queued'
après l'ouverture de la connexion fera
l'affaire.
C'est dramatiq
Côté tâches distribuées en Python, j'évite Celery. Trop lourd, avec de la magie noire et puis cette configuration par défaut qui fait qu'une tâche est considérée comme traitée même si le processus a planté…
J'ai une préférence pour dramatiq, sa légèreté, son API plus propre et sa robustesse. Dans dramatiq, une tâche s'appelle Actor, le reste de l'API est assez évident.
Bizarrement, aucun projets de tâches distribué en Python n'a d'agent basé sur
Postgres. J'ai donc créé dramatiq-pg
basé sur LISTEN
/NOTIFY
. Le projet est à son tout début, n'hésitez pas à
contribuer !