← На главную

Пример использования logical decoding в PostgreSQL

Сегодня я хотел бы вкратце рассказать о возможности PostgreSQL под названием logical decoding. Данный механизм позволяет подписаться на изменения, происходящие в базе данных, и получать эти изменения в удобном для вас формате, например, в JSON. Logical decoding ни в коем случае нельзя путать с логической репликацией. Logical decoding появился в PostgreSQL намного раньше, в версии 9.4, и является механизмом, на основе которого работает логическая репликация, появившаяся в версии 10.

Представление изменений в каком-либо конкретном формате осуществляется не в самом ядре PostgreSQL, а в расширениях. Одним из подобных расширения является wal2json:

git clone https://github.com/eulerto/wal2json cd wal2json make make install

Заведем какую-нибудь таблицу:

CREATE TABLE test (k TEXT PRIMARY KEY, v TEXT);

Создадим replication slot, в котором будет использоваться wal2json:

pg_recvlogical --slot=myslot --dbname=eax --user=eax \ --create-slot --plugin=wal2json

… и начнем тянуть изменения:

pg_recvlogical --slot=myslot --dbname=eax --user=eax \ --start -f - | jq

Пример того, что приходит при вставке новых данных:

{ "change": [ { "kind": "insert", "schema": "public", "table": "test", "columnnames": [ "k", "v" ], "columntypes": [ "text", "text" ], "columnvalues": [ "aaa", "bbb" ] } ] }

При обновлении:

{ "change": [ { "kind": "update", "schema": "public", "table": "test", "columnnames": [ "k", "v" ], "columntypes": [ "text", "text" ], "columnvalues": [ "aaa", "ccc" ], "oldkeys": { "keynames": [ "k" ], "keytypes": [ "text" ], "keyvalues": [ "aaa" ] } } ] }

При удалении данных:

{ "change": [ { "kind": "delete", "schema": "public", "table": "test", "oldkeys": { "keynames": [ "k" ], "keytypes": [ "text" ], "keyvalues": [ "aaa" ] } } ] }

Само собой разумеется, если сеть порвется, при восстановлении подключения нам доедут все пропущенные изменения.

Как и следовало ожидать, для logical decoding характерны все те же ограничения, что и для логической репликации. В частности, DDL, операция truncate и sequences не реплицируются. В этих случаях нам придет просто:

{"change":[]}

По крайней мере, мы будем знать, что что-то изменилось. В крайнем случае, можно подключиться к базе данных напрямую и посмотреть, не появилось ли в ней, к примеру, новых таблиц.

Кроме того, чтобы все работало в соответствии с нашими ожиданиями, у таблиц обязательно должен быть primary key. В противном случае insert будет приходить нормально, а на update и delete мы получим:

WARNING: table "x" without primary key or replica identity is nothing {"change":[]}

Какого-либо механизма фильтрации принимаемых изменений, как в логической репликации, увы, пока не предусмотрено. Реализовать его, впрочем, не сложно, и если вы пошлете соответствующий патч автору wal2json, сомневаюсь, что он откажется его принять.

Отмечу, что на момент написания этих строк, в wal2json имелись некоторые не закрытые баги, в частности раз и два. В связи с этим может иметь смысл рассмотреть альтернативное расширение под названием jsoncdc.

Самое же стабильное расширение идет вместе с самим PostgreSQL и называется test_decoding. Однако это расширение выводит данные в формате, отличным от JSON:

BEGIN 564 table public.test: INSERT: k[text]:'aaa' v[text]:'bbb' COMMIT 564 BEGIN 565 table public.test: UPDATE: k[text]:'aaa' v[text]:'ccc' COMMIT 565 BEGIN 566 table public.test: DELETE: k[text]:'aaa' COMMIT 566

Соответственно, такой формат несколько труднее парсить, и в самопальном парсере на регулярках или чем-то таком непременно найдутся свои баги. Так что, я лично советовал бы остановиться на wal2json.

Дополнение: Также вас может заинтересовать статья Тонкости использования NOTIFY/LISTEN в PostgreSQL.