Тонкости использования NOTIFY/LISTEN в PostgreSQL

29 августа 2022

Одна из возможностей PostgreSQL, которой часто пренебрегают — это NOTIFY и LISTEN. Данный механизм позволяет использовать РСУБД в роли брокера сообщений. NOTIFY/LISTEN обладает рядом полезных свойств, которых нет у прочих решений, таких, как RabbitMQ или Kafka.

Примечание: В SQL-стандарте нет никаких NOTIFY или LISTEN. Это уникальная возможность PostgreSQL.

В первом приближении NOTIFY/LISTEN не выглядит, как что-то сложное:

session1 =# LISTEN some_channel_name;
LISTEN

session2 =# NOTIFY some_channel_name, 'Hello!';
NOTIFY

session1 =# SELECT 1 as x; -- any query
 x
---
 1

Asynchronous notification "some_channel_name" with payload "Hello!" ⏎
  received from server process with PID 12345.

session1 =# UNLISTEN *;
UNLISTEN

Здесь у нас две сессии. Первая подписывается на канал. Вторая публикует в этом канале сообщение. Первая сессия получает сообщение после выполнения очередного запроса. Затем первая сессия отписывается от всех каналов.

Но приведенный пример не показывает всех возможностей NOTIFY/LISTEN. Рассмотрим ситуацию посложнее. За основу возьмем код из заметки Пример использования триггеров в PostgreSQL.

Есть таблица с какими-то данными:

CREATE TABLE data_raw(tstamp TIMESTAMP NOT NULL, val INT NOT NULL);

Повесим на нее триггер, вот такой:

CREATE OR REPLACE FUNCTION data_raw_on_insert() RETURNS TRIGGER AS $$
BEGIN
  PERFORM (
    WITH payload("timestamp", "value") AS (
      SELECT NEW.tstamp, NEW.val
    )
    SELECT pg_notify('data_raw_inserts', row_to_json(payload) :: TEXT)
    FROM payload
  );
  RETURN NULL;
END
$$ LANGUAGE 'plpgsql';

CREATE TRIGGER data_raw_trigger
AFTER INSERT ON data_raw
FOR EACH ROW EXECUTE PROCEDURE data_raw_on_insert();

При вставке новых строк соответствующие сообщения публикуются в канал data_raw_inserts с помощью pg_notify().

Проверим, как это работает. В первой сессии подпишемся на канал:

LISTEN data_raw_inserts;

Во второй — обновим таблицу:

INSERT INTO data_raw VALUES
  ('2022-01-01 00:00:00', 123),
  ('2022-01-01 00:00:00', 123), -- same as above
  ('2022-01-02 00:00:00', 456),
  ('2022-01-03 00:00:00', 789);

В первой сессии после SELECT 1 получим:

Asynchronous notification "data_raw_inserts" with payload ⏎
  "{"timestamp":"2022-01-01T00:00:00","value":123}" received ⏎
  from server process with PID 12345.

Asynchronous notification "data_raw_inserts" with payload ⏎
  "{"timestamp":"2022-01-02T00:00:00","value":456}" received ⏎
  from server process with PID 12345.

Asynchronous notification "data_raw_inserts" with payload ⏎
  "{"timestamp":"2022-01-03T00:00:00","value":789}" received ⏎
  from server process with PID 12345.

Было записано четыре строки в таблицу, но пришло только три сообщения. Все потому что PostgreSQL производит дедупликацию сообщений. Если в рамках транзакции было послано два одинаковых сообщения, слушатели канала получат только одну его копию.

Этот пример демонстрирует, как NOTIFY/LISTEN может быть использован для инвалидации кэшей. Допустим, мы хотим иметь в своем приложении кэш, содержащий копию таблицы data_raw. Тогда сценарий использования будет следующим. Приложение подключается к PostgreSQL, подписывается на data_raw_inserts, после чего делает SELECT из таблицы. Если в таблицу будут записаны новые данные, приложение получит об этом уведомление. Таблицу не нужно постоянно перечитывать. В примере мы использовали триггер только на INSERT, в предположении, что данные только дозаписываются. Но с тем же успехом можно повесить триггеры и на другие операции.

Кто-то скажет, что все то же самое можно сделать в RabbitMQ. А вот и нельзя!

Потому что NOTIFY/LISTEN в PostgreSQL, как и все остальное, являются транзакционными. Если транзакция откатится, никакие посланные в рамках нее сообщения не будут отправлены. Если закоммитится, то будут посланы обязательно. Даже если сразу после COMMIT приложение будет прибито. Или если произойдет netsplit между приложением и РСУБД, и приложение не будет знать состояние транзакции. Если приложение сделало LISTEN посреди работы другой транзакции, то по завершении транзакции приложение получит все уведомления от нее. Даже те, что были созданы до LISTEN и/или до подключения приложения к РСУБД. Наконец, все уведомления приходят не в каком-то случайном порядке, а строго в том, в котором коммитились транзакции.

Есть, конечно, у NOTIFY/LISTEN и свои ограничения.

Во-первых, если приложение подписалось на канал, и затем открыло транзакцию, то уведомления оно получет только после завершения транзакции — успешного или неуспешного. Это по-своему логично. Ведь если транзакция откатится, то РСУБД должна откатить все ее эффекты. Но нельзя так просто взять и откатить получение уведомления.

Во-вторых, размер сообщения не может превышать 8000 байт. Если требуется больше, нужно использовать вспомогательную таблицу и слать в сообщении первичный ключ этой таблицы.

И в-третьих, общий размер очереди сообщений не может превышать 8 Гб. Если попытаться выйти за этот лимит, то транзакция, делавшая NOTIFY, завершится с ошибкой при попытке сделать COMMIT. Когда очередь заполняется наполовину, PostgreSQL начинает писать в логи предупреждения с информацией о сессии, препятствующей очистке очереди. То есть, сделавшей LISTEN и ушедшей в транзакцию. Функция pg_notification_queue_usage() сообщает, насколько (от 0.0 до 1.0) сейчас заполнена очередь.

В общем и целом, NOTIFY/LISTEN — это еще один инструмент. Как и с любым нормальным инструментом, найдутся задачи, для которых он подойдет просто идеально. Но найдутся и такие, где NOTIFY/LISTEN совсем не годится. Зато подойдет, скажем, logical decoding, логическая репликация, или Kafka.

Метки: , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.