Расширения PostgreSQL: фоновые процессы
19 июня 2023
При разработке расширений PostgreSQL иногда требуется запустить отдельный процесс, который выполняет какие-то действия в фоне, без участия пользователя. Такой процесс называется background worker. Давайте разберемся, как все это устроено.
Background worker’ы запускаются процессом postmaster, так же, как и в случае с уже знакомыми нам checkpointer, walwriter и другими процессами СУБД. Для запуска background worker’ов предусмотрен специальный API для расширений, который мы рассмотрим далее. Стоит отметить, что хотя checkpointer и другие подобные процессы качественно похожи на background worker’ы, это другой тип процессов. Они не запускаются через API для запуска background worker’ов.
Есть два способа запустить background worker’а:
- Через вызов
RegisterBackgroundWorker()
. Расширение должно быть подгружено в postmaster при помощиshared_preload_libraries
. Вызов функции должен производиться из_PG_init()
; - При помощи вызова
RegisterDynamicBackgroundWorker()
из обычного бэкенда;
Если вы не понимаете, о каких таких shared_preload_libraries
и _PG_init()
идет речь, ознакомьтесь с постом Расширения PostgreSQL: разделяемая память и локи.
В исходном коде PostgreSQL есть расширение worker_spi, демонстрирующее оба метода запуска. Посмотрим на него в действии:
USE_PGXS=1 make
USE_PGXS=1 make install
В postgresql.conf дописываем:
worker_spi.database = eax
worker_spi.total_workers = 1
Перезапускаем СУБД:
Должны увидеть таблицу counted
в схеме schema1
:
type | value
------+-------
Если теперь записать туда таких данных:
INSERT 0 2
eax=# SELECT * FROM schema1.counted;
type | value
-------+-------
total | 0
delta | 1
(2 rows)
… то через какое-то время содержимое таблицы само собой изменится:
type | value
-------+-------
total | 1
(1 row)
Это созданный расширением background worker увидел изменения и в фоне обновил содержимое таблицы. Заметьте, что для текущей базы данных мы даже не делали CREATE EXTENSION
. Как же это работает?
Поскольку расширение прописано в shared_preload_libraries
, при запуске СУБД оно загружается postmaster’ом и управление передается в _PG_init()
.
В упрощенном виде реализация функции выглядит так:
if (!process_shared_preload_libraries_in_progress)
return;
memset(&worker, 0, sizeof(worker));
/*
* Воркер имеет доступ к разделяемой памяти и может
* ходить в таблицы
*/
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
/*
* Воркер запускается, когда система переходит
* в нормальное рабочее состояние
*/
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
/* В случае ошибки воркер не перезапускается */
worker.bgw_restart_time = BGW_NEVER_RESTART;
/* Название динамической библиотеки с кодом воркера */
sprintf(worker.bgw_library_name, "worker_spi");
/* Имя функции, которой следует передать управление */
sprintf(worker.bgw_function_name, "worker_spi_main");
/*
* Здесь можно указать id процесса, которому postmaster
* будет посылать SIGUSR1, когда воркер запускается или
* завершается
*/
worker.bgw_notify_pid = 0;
/* Имя этого конкретного воркера */
snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", 1);
/* Имя группы/типа воркеров */
snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
/*
* Аргумент, который будет передан воркеру. Дополнительные
* 128 байт (BGW_EXTRALEN) можно положить в bgw_extra.
*/
worker.bgw_main_arg = Int32GetDatum(1);
/* Просим postmaster запустить воркера */
RegisterBackgroundWorker(&worker);
Здесь все достаточно просто. Значение bgw_type
используется в pg_stat_activity:
… а bgw_name
— непосредственно в имени процесса:
...
56108 ? Ss 0:00 postgres: worker_spi worker 1
...
При запуске воркера управление передается в worker_spi_main()
. Упрощенно код функции выглядит таким образом:
worker_spi_main(Datum main_arg)
{
/*
* Это аргумент, переданный через bgw_main_arg.
* К значению bgw_extra можно получить доступ
* через глобальную переменную MyBgworkerEntry.
*/
int index = DatumGetInt32(main_arg);
/* Первым делом нужно указать обработчики сигналов... */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
/* ... и разблокировать обработку сигналов */
BackgroundWorkerUnblockSignals();
/*
* Подключаемся к БД, с которой хотим работать.
* Также есть BackgroundWorkerInitializeConnectionByOid()
*/
BackgroundWorkerInitializeConnection(/* (тут имя БД) */,
NULL, 0);
/* Основной цикл */
for (;;)
{
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
/* (здесь время в миллисекундах) */,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
/*
* Обработать прерывания, если они были получены,
* пока процесс спал. Прерывания в PostgreSQL -
* это не то же самое, что сигналы, хотя некоторые
* прерывания реализованы на основе сигналов.
* Подробности смотри в src/include/miscadmin.h
*/
CHECK_FOR_INTERRUPTS();
/*
* Если придет SIGHUP, его обработчик проставит переменной
* ConfigReloadPending значение true и выставит защелку
* MyLatch, что приведет к пробуждению процесса
*/
if (ConfigReloadPending)
{
ConfigReloadPending = false;
/* Перечитать конфиг */
ProcessConfigFile(PGC_SIGHUP);
}
/* (тут вся полезная нагрузка воркера) */
}
}
С защелками и почему спать на них выгоднее, чем использовать pg_usleep()
, ранее мы разобрались в рамках поста Внутренности PostgreSQL: разделяемые буферы.
Итак, мы поняли, как запускается воркер, а также где у него находится основной цикл. Осталось только понять, как воркер ходит в таблицы. А в таблицы он ходит по обычному SPI. Только здесь есть одна особенность, специфичная именно для background worker’ов.
Чтобы сходить в базу данных по SPI, код должен выглядеть так:
* Имя функции говорит само за себя. Вызов запоминает
* время начала исполнения следующего SQL-выражения.
* Вызов может быть не лишено смысла делать перед
* исполнением каждого нового выражения
*/
SetCurrentStatementStartTimestamp();
/*
* Создаем новую транзакцию. Перед этим вызовом мы
* обязаны сделать SetCurrentStatementStartTimestamp().
* Сохраненное время используется как время начала
* исполнения транзакции
*/
StartTransactionCommand();
/*
* Этот вызов создает активный снэпшот. Последний необходим,
* чтобы последующие запросы видели MVCC данные, с которыми
* они могли бы работать
*/
PushActiveSnapshot(GetTransactionSnapshot());
/*
* Теперь, имея транзакцию, устанавливаем самое обычное
* соединение с менеджером SPI. Порядок вызова SPI_connect()
* и PushActiveSnapshot() несущественен.
*/
SPI_connect();
/* (здесь обычный код хождения по SPI) */
/*
* Закрываем соединение с SPI менеджером, как обычно.
* Делать это перед elog(ERROR), кстати, не обязательно
*/
SPI_finish();
/*
* Снимаем активный снэпшот со стека снэпшотов, освобождаем
* сопутствующие ресурсы
*/
PopActiveSnapshot();
/* Коммитим транзакцию */
CommitTransactionCommand();
Когда SPI используется в обычных хранимках на языке C, то ничего этого делать не нужно. Ведь к моменту вызова хранимки уже есть как активная транзакция, так и активный снэпшот. Но в background worker’е их приходится создавать вручную.
Как видите, в использовании background worker’ов нет ничего сверх сложного. Пример создания воркера через вызов RegisterDynamicBackgroundWorker()
вы найдете в том же worker_spi.с. Этот код не сложнее рассмотренного выше. Если же вам хочется примеров посложнее, попробуйте поизучать исходники расширения TimescaleDB. В нем background worker’ы используются довольно активно.
Дополнение: Расширения PostgreSQL: конфигурационные параметры
Метки: C/C++, PostgreSQL, СУБД.
Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.