Расширения PostgreSQL: фоновые процессы

19 июня 2023

При разработке расширений PostgreSQL иногда требуется запустить отдельный процесс, который выполняет какие-то действия в фоне, без участия пользователя. Такой процесс называется background worker. Давайте разберемся, как все это устроено.

Background worker’ы запускаются процессом postmaster, так же, как и в случае с уже знакомыми нам checkpointer, walwriter и другими процессами СУБД. Для запуска background worker’ов предусмотрен специальный API для расширений, который мы рассмотрим далее. Стоит отметить, что хотя checkpointer и другие подобные процессы качественно похожи на background worker’ы, это другой тип процессов. Они не запускаются через API для запуска background worker’ов.

Есть два способа запустить background worker’а:

  • Через вызов RegisterBackgroundWorker(). Расширение должно быть подгружено в postmaster при помощи shared_preload_libraries. Вызов функции должен производиться из _PG_init();
  • При помощи вызова RegisterDynamicBackgroundWorker() из обычного бэкенда;

Если вы не понимаете, о каких таких shared_preload_libraries и _PG_init() идет речь, ознакомьтесь с постом Расширения PostgreSQL: разделяемая память и локи.

В исходном коде PostgreSQL есть расширение worker_spi, демонстрирующее оба метода запуска. Посмотрим на него в действии:

cd src/test/modules/worker_spi
USE_PGXS=1 make
USE_PGXS=1 make install

В postgresql.conf дописываем:

shared_preload_libraries = worker_spi
worker_spi.database = eax
worker_spi.total_workers = 1

Перезапускаем СУБД:

pg_ctl restart -D ~/projects/pginstall/data-master

Должны увидеть таблицу counted в схеме schema1:

=# SELECT * FROM schema1.counted;
 type | value
------+-------

Если теперь записать туда таких данных:

eax=# INSERT INTO schema1.counted VALUES ('total', 0), ('delta', 1);
INSERT 0 2

eax=# SELECT * FROM schema1.counted;
 type  | value
-------+-------
 total |     0
 delta |     1
(2 rows)

… то через какое-то время содержимое таблицы само собой изменится:

eax=# SELECT * FROM schema1.counted;
 type  | value
-------+-------
 total |     1
(1 row)

Это созданный расширением background worker увидел изменения и в фоне обновил содержимое таблицы. Заметьте, что для текущей базы данных мы даже не делали CREATE EXTENSION. Как же это работает?

Поскольку расширение прописано в shared_preload_libraries, при запуске СУБД оно загружается postmaster’ом и управление передается в _PG_init().

В упрощенном виде реализация функции выглядит так:

BackgroundWorker worker;

if (!process_shared_preload_libraries_in_progress)
    return;

memset(&worker, 0, sizeof(worker));

/*
 * Воркер имеет доступ к разделяемой памяти и может
 * ходить в таблицы
 */

worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
                   BGWORKER_BACKEND_DATABASE_CONNECTION;
/*
 * Воркер запускается, когда система переходит
 * в нормальное рабочее состояние
 */

worker.bgw_start_time = BgWorkerStart_RecoveryFinished;

/* В случае ошибки воркер не перезапускается */
worker.bgw_restart_time = BGW_NEVER_RESTART;

/* Название динамической библиотеки с кодом воркера */
sprintf(worker.bgw_library_name, "worker_spi");

/* Имя функции, которой следует передать управление */
sprintf(worker.bgw_function_name, "worker_spi_main");

/*
 * Здесь можно указать id процесса, которому postmaster
 * будет посылать SIGUSR1, когда воркер запускается или
 * завершается
 */

worker.bgw_notify_pid = 0;

/* Имя этого конкретного воркера */
snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", 1);

/* Имя группы/типа воркеров */
snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");

/*
 * Аргумент, который будет передан воркеру. Дополнительные
 * 128 байт (BGW_EXTRALEN) можно положить в bgw_extra.
 */

worker.bgw_main_arg = Int32GetDatum(1);

/* Просим postmaster запустить воркера */
RegisterBackgroundWorker(&worker);

Здесь все достаточно просто. Значение bgw_type используется в pg_stat_activity:

SELECT * FROM pg_stat_activity WHERE backend_type = 'worker_spi';

… а bgw_name — непосредственно в имени процесса:

$ ps ax | grep postgres
...
56108 ?        Ss     0:00 postgres: worker_spi worker 1
...

При запуске воркера управление передается в worker_spi_main(). Упрощенно код функции выглядит таким образом:

void
worker_spi_main(Datum main_arg)
{
    /*
     * Это аргумент, переданный через bgw_main_arg.
     * К значению bgw_extra можно получить доступ
     * через глобальную переменную MyBgworkerEntry.
     */

    int  index = DatumGetInt32(main_arg);

    /* Первым делом нужно указать обработчики сигналов... */
    pqsignal(SIGHUP, SignalHandlerForConfigReload);
    pqsignal(SIGTERM, die);

    /* ... и разблокировать обработку сигналов */
    BackgroundWorkerUnblockSignals();

    /*
     * Подключаемся к БД, с которой хотим работать.
     * Также есть BackgroundWorkerInitializeConnectionByOid()
     */

    BackgroundWorkerInitializeConnection(/* (тут имя БД) */,
                                         NULL, 0);

    /* Основной цикл */
    for (;;)
    {
        (void) WaitLatch(MyLatch,
            WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
            /* (здесь время в миллисекундах) */,
            PG_WAIT_EXTENSION);
        ResetLatch(MyLatch);

        /*
         * Обработать прерывания, если они были получены,
         * пока процесс спал. Прерывания в PostgreSQL -
         * это не то же самое, что сигналы, хотя некоторые
         * прерывания реализованы на основе сигналов.
         * Подробности смотри в src/include/miscadmin.h
         */

        CHECK_FOR_INTERRUPTS();

        /*
         * Если придет SIGHUP, его обработчик проставит переменной
         * ConfigReloadPending значение true и выставит защелку
         * MyLatch, что приведет к пробуждению процесса
         */

        if (ConfigReloadPending)
        {
            ConfigReloadPending = false;

            /* Перечитать конфиг */
            ProcessConfigFile(PGC_SIGHUP);
        }

        /* (тут вся полезная нагрузка воркера) */
    }
}

С защелками и почему спать на них выгоднее, чем использовать pg_usleep(), ранее мы разобрались в рамках поста Внутренности PostgreSQL: разделяемые буферы.

Итак, мы поняли, как запускается воркер, а также где у него находится основной цикл. Осталось только понять, как воркер ходит в таблицы. А в таблицы он ходит по обычному SPI. Только здесь есть одна особенность, специфичная именно для background worker’ов.

Чтобы сходить в базу данных по SPI, код должен выглядеть так:

/*
 * Имя функции говорит само за себя. Вызов запоминает
 * время начала исполнения следующего SQL-выражения.
 * Вызов может быть не лишено смысла делать перед
 * исполнением каждого нового выражения
 */

SetCurrentStatementStartTimestamp();

/*
 * Создаем новую транзакцию. Перед этим вызовом мы
 * обязаны сделать SetCurrentStatementStartTimestamp().
 * Сохраненное время используется как время начала
 * исполнения транзакции
 */

StartTransactionCommand();

/*
 * Этот вызов создает активный снэпшот. Последний необходим,
 * чтобы последующие запросы видели MVCC данные, с которыми
 * они могли бы работать
 */

PushActiveSnapshot(GetTransactionSnapshot());

/*
 * Теперь, имея транзакцию, устанавливаем самое обычное
 * соединение с менеджером SPI. Порядок вызова SPI_connect()
 * и PushActiveSnapshot() несущественен.
 */

SPI_connect();


/* (здесь обычный код хождения по SPI) */


/*
 * Закрываем соединение с SPI менеджером, как обычно.
 * Делать это перед elog(ERROR), кстати, не обязательно
 */

SPI_finish();

/*
 * Снимаем активный снэпшот со стека снэпшотов, освобождаем
 * сопутствующие ресурсы
 */

PopActiveSnapshot();

/* Коммитим транзакцию */
CommitTransactionCommand();

Когда SPI используется в обычных хранимках на языке C, то ничего этого делать не нужно. Ведь к моменту вызова хранимки уже есть как активная транзакция, так и активный снэпшот. Но в background worker’е их приходится создавать вручную.

Как видите, в использовании background worker’ов нет ничего сверх сложного. Пример создания воркера через вызов RegisterDynamicBackgroundWorker() вы найдете в том же worker_spi.с. Этот код не сложнее рассмотренного выше. Если же вам хочется примеров посложнее, попробуйте поизучать исходники расширения TimescaleDB. В нем background worker’ы используются довольно активно.

Дополнение: Расширения PostgreSQL: конфигурационные параметры

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.