Pub-Sub avec Postgres

Dès qu'une application web a besoin de déléguer une tâche lourde ou critique en arrière-plan, on se retrouve avec Redis pour stocker le résultat des tâches et parfois RabbitMQ pour gérer la messagerie. La mode est maintenant d'utiliser Redis aussi pour la messagerie, grâce à une procédure stockée en lua, ça économise le gros bousin qu'est RabbitMQ.

Postgres peut déjà remplacer Redis grâce à JSONb. Mais pour notifier, va-t-on bourriner Postgres avec des SELECT ? Heureusement non, grâce à LISTEN/NOTIFY !

LISTEN/NOTIFY est une extension au langage SQL pour communiquer une information aux autres sessions ouvertes. Postgres distingue plusieurs canaux de notification, auquel on s'abonne. Couplé avec l'appel système select, on peut rester très efficace. Voilà à quoi ça ressemble :

Pub-Sub avec Postgres

Je vous propose de détailler un peu comment utiliser Postgres pour communiquer entre frontal et dorsal.

Pousser un message dans la queue

La queue des messages est une simple table où l'on stocke le message, un identifiant et l'état du message. Une fois que le message est inséré, le frontal notifie les dorsaux avec la commande NOTIFY et deux paramètres : le canal et l'identifiant du message.

INSERT INTO messages(message, state)
VALUES ('message', 'queued')
RETURNING id;
NOTIFY "queue", id;

Reste à faire quelque chose de ce message !

Consommer

Côté dorsal, il faut ouvrir une connexion permanente qui écoute les notifications, sur le même canal où le frontal émet.

LISTEN queue;

Pour utiliser ça, oubliez votre ORM. La documentation de psycopg2 propose un exemple de récupération des notifications asynchrones en Python. asyncpg implémente également cette partie du protocole.

Une fois que le dorsal a l'information, consommer le message est plus délicat. Le message ne doit être traité qu'une fois ! Grâce à l'intégrité de Postgres, il n'y a rien d'obscur à faire. Lors d'un UPDATE la table est correctement verrouillée. L'astuce est de bien préciser à Postgres de mettre à jour l'état du message, uniquement si ce n'est pas déjà fait !

UPDATE messages
SET state = 'consumed'
WHERE id = 123 AND state = 'queued'
RETURNING message;

La clause state = 'queued' s'assure que Postgres ne touche pas un message déjà consommé par un autre dorsal. Postgres retourne le nombre de lignes modifiées par une requête. Si c'est 1, le message était à queued il faut donc le traiter ! Si c'est 0, un autre dorsal a déjà consommé le message, il faut passer au suivant.

La clause RETURNING message est une sorte de SELECT caché dans l'UPDATE. Postgres retourne le contenu message 123 s'il est queued. Il ne reste qu'à ouvrir le courrier !

Je trouve ça plus simple que d'envoyer du lua dans Redis… Merci Postgres !

Dernier point, pour récupérer les messages envoyés avant LISTEN, un simple SELECT WHERE state = 'queued' après l'ouverture de la connexion fera l'affaire.

Trompe accrochée à une queue d'éléphant

C'est dramatiq

Côté tâches distribuées en Python, j'évite Celery. Trop lourd, avec de la magie noire et puis cette configuration par défaut qui fait qu'une tâche est considérée comme traitée même si le processus a planté…

J'ai une préférence pour dramatiq, sa légèreté, son API plus propre et sa robustesse. Dans dramatiq, une tâche s'appelle Actor, le reste de l'API est assez évident.

Bizarrement, aucun projets de tâches distribué en Python n'a d'agent basé sur Postgres. J'ai donc créé dramatiq-pg basé sur LISTEN/NOTIFY. Le projet est à son tout début, n'hésitez pas à contribuer !