Тонкости реализации непрерывных агрегатов в TimescaleDB

6 октября 2021

Тут по работе возникла необходимость разобраться в тонкостях устройства непрерывных агрегатов в TimescaleDB. Вашему вниманию предлагается конспект того, что мне удалось разузнать и как-то систематизировать. Этот конспект не претендует на полноту и для упрощения намеренно обходит стороной, к примеру, нюансы, касающиеся распределенных гипертаблиц.

Эксперименты проводились на TimescaleDB 2.5.0-dev. Создадим гипертаблицу и построим по ней непрерывный агрегат:

CREATE TABLE conditions(
  ts DATE NOT NULL,
  city TEXT NOT NULL,
  temperature INT NOT NULL);

SELECT create_hypertable(
  'conditions', 'ts',
  chunk_time_interval => INTERVAL '1 day'
);

INSERT INTO conditions (ts, city, temperature) VALUES
  ('2021-06-14', 'Moscow', 26),
  ('2021-06-15', 'Moscow', 22),
  ('2021-06-16', 'Moscow', 24),
  ('2021-06-17', 'Moscow', 24),
  ('2021-06-18', 'Moscow', 27),
  ('2021-06-19', 'Moscow', 28),
  ('2021-06-20', 'Moscow', 30),
  ('2021-06-21', 'Moscow', 31),
  ('2021-06-22', 'Moscow', 34),
  ('2021-06-23', 'Moscow', 34),
  ('2021-06-24', 'Moscow', 34),
  ('2021-06-25', 'Moscow', 32),
  ('2021-06-26', 'Moscow', 32),
  ('2021-06-27', 'Moscow', 31);

CREATE MATERIALIZED VIEW conditions_summary_weekly
WITH (timescaledb.continuous) AS
SELECT city,
       time_bucket('7 days', ts) AS bucket,
       MIN(temperature),
       MAX(temperature),
       AVG(temperature)
FROM conditions
GROUP BY city, bucket;

Что конкретно представляет собой непрерывный агрегат? Ответ на этот вопрос дает команда:

\d+ conditions_summary_weekly

Оказывается, что это самый обыкновенный VIEW. Он определен, как:

SELECT ...
FROM _timescaledb_internal._materialized_hypertable_2
WHERE ... GROUP BY ...
UNION ALL
SELECT ...
FROM conditions
WHERE ... GROUP BY ...

Процесс построения промежуточного представления агрегата на диске называется материализацией (materialization). Само же это «промежуточное представление» хранится в таблице материализации (materialization table). Здесь таблица называется _materialized_hypertable_2. При чтении из вьюхи conditions_summary_weekly ответ на пользовательский запрос формируется из материализованных данных и данных из исходной таблицы conditions, которые еще не были материализованы. Этот процесс называется финализацией (finalization). Характерно, что таблица материализации является гипертаблицей (то есть, она партицирована), как и исходная.

Посмотрим на определение _materialized_hypertable_2:

\d _timescaledb_internal._materialized_hypertable_2
Table "_timescaledb_internal._materialized_hypertable_2"
  Column  |  Type   | Collation | Nullable | Default
----------+---------+-----------+----------+---------
 city     | text    |           |          |
 bucket   | date    |           | not null |
 agg_3_3  | bytea   |           |          |
 agg_4_4  | bytea   |           |          |
 agg_5_5  | bytea   |           |          |
 chunk_id | integer |           |          |

Здесь agg_* — это частичные агрегаты (partial aggregates). Для максимума и минимума частичному агрегату достаточно хранить одно число. Но, например, для среднего необходимо хранить пару чисел sum и count. Еще один столбец chunk_id служит уникальным идентификатором записи. Для одной пары city и bucket в таблице может быть несколько записей.

Важно! Содержимое всего, что хранится в схемах _timescaledb_* является деталью реализации конкретной версии TimescaleDB. Расширение в свою очередь собирается немного по-разному в зависимости от используемой версии PostgreSQL. Пользователь не должен работать с этими данными напрямую. В частности, chunk_id может быть убран в будущих версиях.

Обновление таблицы материализации может производиться вручную:

CALL refresh_continuous_aggregate(
    'conditions_summary_weekly',
    window_start => '2021-06-14',
    window_end => '2021-06-27');

… или автоматически по расписанию:

SELECT add_continuous_aggregate_policy('conditions_summary_weekly',
    start_offset => INTERVAL '15 days',
    end_offset => INTERVAL '1 day',
    schedule_interval => INTERVAL '1 day');

Примечание: Более подробное описание всех процедур и функций, а также их аргументов и возвращаемых значений доступно в официальной документации.

Интервал времени (от window_start до window_end в первом примере), за который происходит обновление агрегата, называется окном обновления (refresh window). Чтобы понять, как конкретно устроено обновление агрегатов, сначала нужно отметить важный побочный эффект от вызова refresh. Эффект заключается в обновлении порога инвалидации (invalidation threshold). Что это такое проще всего показать на примере.

Выполним такие запросы:

CALL refresh_continuous_aggregate(
    'conditions_summary_weekly',
    '2021-07-05',
    '2021-07-12');

-- Важно! Здесь столбец называется watermark, но этот термин означает
-- нечто совершенно иное, см далее. Вероятно, так произошло по
-- историческим причинам.
SELECT hypertable_id, _timescaledb_internal.to_timestamp(watermark)
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold;

Результат:

 hypertable_id |      to_timestamp
---------------+------------------------
             1 | 2021-07-12 00:00:00+00

Теперь скажем:

CALL refresh_continuous_aggregate(
    'conditions_summary_weekly',
    '2021-07-12',
    '2021-07-19');

SELECT hypertable_id, _timescaledb_internal.to_timestamp(watermark)
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold;

Получим результат:

 hypertable_id |      to_timestamp
---------------+------------------------
             1 | 2021-07-19 00:00:00+00

TimescaleDB всегда обновляет корзины (ведра, buckets) целиком. Поскольку нельзя обновить часть корзины, при вызове refresh окно обновления обрезается слева и справа так, чтобы в него входили только целые корзины. Напомню, что в нашем примере размер корзины составляет неделю. Данные обновляются за обрезанное окно обновления, а правая граница окна записывается в порог инвалидации, как показано выше. Порог инвалидации — это временная метка, данные до которой были отправлены на материализацию (но не факт, что они уже материализованы, см далее). Если refresh никогда не вызывался, порогом инвалидации является минус бесконечность. Порог инвалидации никогда не уменьшается, он может только увеличиваться.

А что будет, если мы запишем в conditions данные с меткой меньше порога инвалидации? Или обновим/удалим данные, которые ранее использовались для заполнения таблицы материализации?

Например:

INSERT INTO conditions (ts, city, temperature)
VALUES ('2021-06-01', 'Moscow', 25),
       ('2021-06-02', 'Moscow', 25);

Специально для таких случаев на исходной гипертаблице висит триггер:

\d conditions
...
Triggers:
    ts_cagg_invalidation_trigger
    AFTER INSERT OR DELETE OR UPDATE ON conditions
    FOR EACH ROW EXECUTE FUNCTION
    _timescaledb_internal.continuous_agg_invalidation_trigger('1')
....

Он называется триггером инвалидации (invalidation trigger). Триггер смотрит на временную метку добавляемых или изменяемых данных. Если эта метка больше порога инвалидации, то все в порядке. Мы просто дописываем или изменяем горячие данные. Если же метка меньше порога инвалидации, значит материализованные данные больше не верны. В этом случае триггер создает запись в логе инвалидации гипертаблиц (hypertable invalidation log):

SELECT
    hypertable_id,
    _timescaledb_internal.to_timestamp(lowest_modified_value),
    _timescaledb_internal.to_timestamp(greatest_modified_value)
FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;

Результат:

-[ RECORD 1 ]-+-----------------------
hypertable_id | 1
to_timestamp  | 2021-06-01 00:00:00+00
to_timestamp  | 2021-06-01 00:00:00+00
-[ RECORD 2 ]-+-----------------------
hypertable_id | 1
to_timestamp  | 2021-06-02 00:00:00+00
to_timestamp  | 2021-06-02 00:00:00+00

Интервалы времени из лога инвалидации гипертаблиц называются интервалами инвалидации (invalidation interval).

Данные всегда дописываются в лог, чтобы система корректно работала независимо от используемого уровня изоляции. Транзакции, работающие на уровнях изоляции выше READ COMMITTED, не могут видеть последнее актуальное значение порога инвалидации. Поэтому они игнорируют его значение и всегда пишут в лог инвалидации гипертаблиц. Заинтересованным читателям предлагается проверить это в качестве упражнения.

Вы еще помните определение conditions_summary_weekly через пару SELECT’ов с UNION ALL? Из определения следует, что обновленные данные за прошлое в непрерывном агрегате мы сразу не увидим. Ведь эти данные должны были учитываться при материализации, но не учитывались. Данные станут видны только после того, как будет произведен refresh. Притом, не любой, а с правильным окном обновления. Окно обновления должно захватывать временные метки новых данных. При этом необходимо учесть обрезание окна по границе целиком входящих в него корзин.

Если поискать по каталогу TimescaleDB, можно обнаружить еще один лог, лог инвалидации таблиц материализации (materialization invalidation log):

\d _timescaledb_catalog.continuous_aggs_materialization_invalidation_log

Он очень похож на лог инвалидации гипертаблиц. Зачем же нужно два лога? Когда мы делаем refresh, он происходит в два этапа.

На первом этапе записи из лога инвалидации гипертаблиц копируются в лог инвалидации таблиц материализации. Тут важно вспомнить, что по одной гипертаблице может быть построено несколько непрерывных агрегатов. Поэтому из одной записи лога инвалидации гипертаблиц в логе инвалидации таблиц материализации будет создано по одной записи на каждый агрегат. После копирования лог инвалидации гипертаблиц подчищается простым DELETE’ом, происходит обновление порога инвалидации, и транзакция COMMIT’ится. Первый этап считается быстрым в предположении, что в логе инвалидации гипертаблиц мало записей (в идеале их вообще не должно быть).

На втором этапе обрабатывается лог инвалидации таблиц материализации, и наконец-то случается та самая материализация. Это считается медленным этапом. Во время итерации по записям из лога они по возможности мержуются и складываются в так называемое хранилище кортежей (tuple store). Это примитив из кодобазы PostgreSQL. Далее таблица материализации заполняется на основе померженых записей из лога, данных из исходной гипертаблицы и аргументов, переданных refresh’у. Лог инвалидации таблиц материализации чистится простым DELETE’ом, и происходит COMMIT.

Обратите внимание, что вызов refresh производится для конкретного агрегата. На первом этапе записи копируются для всех агрегатов исходной гипертаблицы. На втором этапе обрабатываются записи одного агрегата, для которого был вызван refresh. refresh_continuous_aggregate() является синхронной процедурой. Она не планирует никаких отложенных операций и возвращает управление после полного выполнения обоих этапов.

Реализация refresh в два этапа выгодна тем, что исходная таблица блокируется по записи только на время первого, быстрого этапа. Блокировка нужна для того, чтобы записи однозначно делились на те, что случились до порога инвалидации, и те, что случились после. Сам же порог инвалидации является не чем иным, как оптимизацией для транзакций с уровнем READ COMMITTED и ниже. При работе с временным рядами более строгие уровни изоляции обычно не требуются. Без порога инвалидации записи в лог инвалидации гипертаблиц пришлось бы делать при каждой вставке в исходную таблицу.

Порог инвалидации не фигурирует в определении VIEW непрерывного агрегата. Как минимум, это создало бы гонку, поскольку между двумя этапами refresh’а порог инвалидации уже обновлен, но данные еще не материализованы. Кроме того, порог инвалидации один на гипертаблицу, а построенных по ней агрегатов много. Вместо порога инвалидации VIEW использует watermark. Это временная метка, до которой были материализованы данные у заданного агрегата:

SELECT
    mat_hypertable_id,
    _timescaledb_internal.to_timestamp(
        _timescaledb_internal.cagg_watermark(mat_hypertable_id)
    )
FROM _timescaledb_catalog.continuous_agg;

Детали реализации cagg_watermark() можно найти в src/continuous_agg.c. В сишном коде функция называется ts_continuous_agg_watermark.

Вот так примерно устроены непрерывные агрегаты. Вынужден признать, что представленная информация не полна, и не обязательно на 100% верна. Если вы хотите знать абсолютно все детали в совершенно достоверном виде, то их необходимо искать в коде конкретной версии TimescaleDB. Тем не менее, даже такие знания крайне полезны при решении практических задач.

Метки: , .