← На главную

Расширения PostgreSQL: фоновые процессы

При разработке расширений PostgreSQL иногда требуется запустить отдельный процесс, который выполняет какие-то действия в фоне, без участия пользователя. Такой процесс называется background worker. Давайте разберемся, как все это устроено.

Background worker’ы запускаются процессом postmaster, так же, как и в случае с уже знакомыми нам checkpointer, walwriter и другими процессами СУБД. Для запуска background worker’ов предусмотрен специальный API для расширений, который мы рассмотрим далее. Стоит отметить, что хотя checkpointer и другие подобные процессы качественно похожи на background worker’ы, это другой тип процессов. Они не запускаются через API для запуска background worker’ов.

Есть два способа запустить background worker’а:

  • Через вызов RegisterBackgroundWorker(). Расширение должно быть подгружено в postmaster при помощи shared_preload_libraries. Вызов функции должен производиться из _PG_init();
  • При помощи вызова RegisterDynamicBackgroundWorker() из обычного бэкенда;

Если вы не понимаете, о каких таких shared_preload_libraries и _PG_init() идет речь, ознакомьтесь с постом Расширения PostgreSQL: разделяемая память и локи.

В исходном коде PostgreSQL есть расширение worker_spi, демонстрирующее оба метода запуска. Посмотрим на него в действии:

cd src/test/modules/worker_spi USE_PGXS=1 make USE_PGXS=1 make install

В postgresql.conf дописываем:

shared_preload_libraries = worker_spi worker_spi.database = eax worker_spi.total_workers = 1

Перезапускаем СУБД:

pg_ctl restart -D ~/projects/pginstall/data-master

Должны увидеть таблицу counted в схеме schema1:

=# SELECT * FROM schema1.counted; type | value ------+-------

Если теперь записать туда таких данных:

eax=# INSERT INTO schema1.counted VALUES ('total', 0), ('delta', 1); INSERT 0 2 eax=# SELECT * FROM schema1.counted; type | value -------+------- total | 0 delta | 1 (2 rows)

… то через какое-то время содержимое таблицы само собой изменится:

eax=# SELECT * FROM schema1.counted; type | value -------+------- total | 1 (1 row)

Это созданный расширением background worker увидел изменения и в фоне обновил содержимое таблицы. Заметьте, что для текущей базы данных мы даже не делали CREATE EXTENSION. Как же это работает?

Поскольку расширение прописано в shared_preload_libraries, при запуске СУБД оно загружается postmaster’ом и управление передается в _PG_init().

В упрощенном виде реализация функции выглядит так:

BackgroundWorker worker; if (!process_shared_preload_libraries_in_progress) return; memset(&worker, 0, sizeof(worker)); /* * Воркер имеет доступ к разделяемой памяти и может * ходить в таблицы */ worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; /* * Воркер запускается, когда система переходит * в нормальное рабочее состояние */ worker.bgw_start_time = BgWorkerStart_RecoveryFinished; /* В случае ошибки воркер не перезапускается */ worker.bgw_restart_time = BGW_NEVER_RESTART; /* Название динамической библиотеки с кодом воркера */ sprintf(worker.bgw_library_name, "worker_spi"); /* Имя функции, которой следует передать управление */ sprintf(worker.bgw_function_name, "worker_spi_main"); /* * Здесь можно указать id процесса, которому postmaster * будет посылать SIGUSR1, когда воркер запускается или * завершается */ worker.bgw_notify_pid = 0; /* Имя этого конкретного воркера */ snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", 1); /* Имя группы/типа воркеров */ snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi"); /* * Аргумент, который будет передан воркеру. Дополнительные * 128 байт (BGW_EXTRALEN) можно положить в bgw_extra. */ worker.bgw_main_arg = Int32GetDatum(1); /* Просим postmaster запустить воркера */ RegisterBackgroundWorker(&worker);

Здесь все достаточно просто. Значение bgw_type используется в pg_stat_activity:

SELECT * FROM pg_stat_activity WHERE backend_type = 'worker_spi';

… а bgw_name – непосредственно в имени процесса:

$ ps ax | grep postgres ... 56108 ? Ss 0:00 postgres: worker_spi worker 1 ...

При запуске воркера управление передается в worker_spi_main(). Упрощенно код функции выглядит таким образом:

void worker_spi_main(Datum main_arg) { /* * Это аргумент, переданный через bgw_main_arg. * К значению bgw_extra можно получить доступ * через глобальную переменную MyBgworkerEntry. */ int index = DatumGetInt32(main_arg); /* Первым делом нужно указать обработчики сигналов... */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGTERM, die); /* ... и разблокировать обработку сигналов */ BackgroundWorkerUnblockSignals(); /* * Подключаемся к БД, с которой хотим работать. * Также есть BackgroundWorkerInitializeConnectionByOid() */ BackgroundWorkerInitializeConnection(/* (тут имя БД) */, NULL, 0); /* Основной цикл */ for (;;) { (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, /* (здесь время в миллисекундах) */, PG_WAIT_EXTENSION); ResetLatch(MyLatch); /* * Обработать прерывания, если они были получены, * пока процесс спал. Прерывания в PostgreSQL - * это не то же самое, что сигналы, хотя некоторые * прерывания реализованы на основе сигналов. * Подробности смотри в src/include/miscadmin.h */ CHECK_FOR_INTERRUPTS(); /* * Если придет SIGHUP, его обработчик проставит переменной * ConfigReloadPending значение true и выставит защелку * MyLatch, что приведет к пробуждению процесса */ if (ConfigReloadPending) { ConfigReloadPending = false; /* Перечитать конфиг */ ProcessConfigFile(PGC_SIGHUP); } /* (тут вся полезная нагрузка воркера) */ } }

С защелками и почему спать на них выгоднее, чем использовать pg_usleep(), ранее мы разобрались в рамках поста Внутренности PostgreSQL: разделяемые буферы.

Итак, мы поняли, как запускается воркер, а также где у него находится основной цикл. Осталось только понять, как воркер ходит в таблицы. А в таблицы он ходит по обычному SPI. Только здесь есть одна особенность, специфичная именно для background worker’ов.

Чтобы сходить в базу данных по SPI, код должен выглядеть так:

/* * Имя функции говорит само за себя. Вызов запоминает * время начала исполнения следующего SQL-выражения. * Вызов может быть не лишено смысла делать перед * исполнением каждого нового выражения */ SetCurrentStatementStartTimestamp(); /* * Создаем новую транзакцию. Перед этим вызовом мы * обязаны сделать SetCurrentStatementStartTimestamp(). * Сохраненное время используется как время начала * исполнения транзакции */ StartTransactionCommand(); /* * Этот вызов создает активный снэпшот. Последний необходим, * чтобы последующие запросы видели MVCC данные, с которыми * они могли бы работать */ PushActiveSnapshot(GetTransactionSnapshot()); /* * Теперь, имея транзакцию, устанавливаем самое обычное * соединение с менеджером SPI. Порядок вызова SPI_connect() * и PushActiveSnapshot() несущественен. */ SPI_connect(); /* (здесь обычный код хождения по SPI) */ /* * Закрываем соединение с SPI менеджером, как обычно. * Делать это перед elog(ERROR), кстати, не обязательно */ SPI_finish(); /* * Снимаем активный снэпшот со стека снэпшотов, освобождаем * сопутствующие ресурсы */ PopActiveSnapshot(); /* Коммитим транзакцию */ CommitTransactionCommand();

Когда SPI используется в обычных пользовательских функциях на языке C, то ничего этого делать не нужно. Ведь к моменту вызова функции уже есть как активная транзакция, так и активный снэпшот. Но в background worker’е их приходится создавать вручную.

Как видите, в использовании background worker’ов нет ничего сверх сложного. Пример создания воркера через вызов RegisterDynamicBackgroundWorker() вы найдете в том же worker_spi.с. Этот код не сложнее рассмотренного выше. Если же вам хочется примеров посложнее, попробуйте поизучать исходники расширения TimescaleDB. В нем background worker’ы используются довольно активно.

Дополнение: Расширения PostgreSQL: конфигурационные параметры