Шардинг, перебалансировка и распределенные транзакции в реляционных базах данных

16 ноября 2016

При разработке нового проекта в качестве основной СУБД нередко выбираются реляционные базы данных, такие, как PostgreSQL или MySQL. В этом действительно есть смысл. Первое время у проекта мало пользователей, и потому все данные помещаются в один сервер. При этом проект активно развивается. Нельзя заранее сказать, какой функционал в нем станет основным, а какой будет выкинут. Есть много историй о том, как мобильный дейтинг в итоге превращался в криптомессанджер, и подобного рода. РСУБД удобны на ранних этапах, потому что они универсальны. Так, PostgreSQL из коробки имеет встроенный полнотекстовый поиск, умеет эффективно работать с геоданными, а также подходит для хранения очередей и рассылки уведомлений. По мере развития проекта и роста нагрузки часть данных может быть перенесена в специализированные NoSQL решения. Также нагрузку можно распределить, поделив базу на несколько совершенно не связанных между собой баз, а также при помощи потоковой репликации. Но что делать в случае, если все это не помогло? В этом посте я постараюсь ответить на данный вопрос.

Примечание: Хочу поблагодарить gridem, sum3rman и gliush за активное участие в обсуждении поднятых в данном посте вопросов. Многие из озвученных ниже идей были позаимствованы у этих ребят.

Декомпозиция проблемы

Задачу построения горизонтально масштабируемого РСУБД-кластера можно разделить на следующие сравнительно независимые задачи:

  • Автофейловер. С ростом числа машин в системе встает проблема автоматической обработки падения этих машин. В рамках сего поста автофейловер не рассматривается, так как ему был посвящен отдельный пост Stolon: создаем кластер PostgreSQL с автофейловером. Далее эта задача считается решенной. Предполагается, что все кусочки данных (vbucket’ы) хранятся на репликасетах с автофейловером (кластерах Stolon’а). Термин репликасет позаимствован у MongoDB.
  • Шардинг, или распределение данных по репликасетам.
  • Перебалансировка, или перемещение данных между репликасетами.
  • Решардинг. Под решардингом далее понимается изменение схемы шардирования или ее параметров. Например, изменения числа частей, на которые нарезаются данные. Важно подчеркнуть, что это совершенно отдельная от перебалансировки задача и их не следует путать. Данная терминология (перебалансировка, решардинг, и что это не одно и то же) позаимствована из документации множества таких NoSQL решений, как Cassandra, CouchDB, Couchbase и Riak.
  • Распределенные транзакции. Поскольку данные распределяются по нескольким репликасетам, возникает проблема выполнения транзакций между репликасетами.
  • Автоматизация. Переизобретать все описанное выше в каждом новом проекте трудоемко и непрактично. Поэтому возникает закономерное желание как-то решить эту проблему один-единственный раз и потом повторно использовать это решение. Увы, как будет показано ниже, очень сложно представить себе универсальное решение, которое подходило бы всем. Поэтому в рамках данной заметки вопрос автоматизации не рассматривается.

Попробуем рассмотреть озвученные проблемы по отдельности.

Шардинг

Существует много схем шардирования. С довольно полным списком можно ознакомиться, например, здесь. Насколько мне известно, наиболее практичной и часто используемой схемой (в частности, она используется в Riak и Couchbase) является следующая.

Каждая единица данных относится к определенной «виртуальной корзине», или vbucket. Число vbucket определяется заранее, берется достаточно большим и обычно является степенью двойки. В Couchbase, например, по умолчанию существует 1024 vbucket’а. Для определения, к какому vbucket относится единица данных, выбирается некий ключ, однозначно определяющий единицу данных, и используется формула типа:

vbucket_num = hash(key) % 1024

Couchbase, например, является KV-хранилищем. Поэтому совершенно логично единицей данных в нем является пара ключ-значение, а ключом, определяющим единицу данных, является, собственно, строковый ключ. Но единица данных может быть и более крупной. Например, если мы пишем социальную сеть, то можем создать 1024 баз данных с одинаковой схемой, а в качестве ключа использовать идентификатор пользователя. Самое главное здесь, чтобы данные, попадающие в разные vbucket’ы, были как можно менее связанными друг с другом, а в идеале — вообще никак не связанными.

Описанным выше способом мы получаем отображение (ключ → номер vbucket). Однако это отображение не дает нам ответа на вопрос, где физически следует искать данные, то есть, к какому репликасету они относятся. Для этого используется так называемый словарь, отображающий номер vbucket’а в конкретный репликасет. Поскольку выше было сказано, что задачу автоматического фейловера мы решаем при помощи Stolon, а ему для работы нужен Consul, который, помимо прочего, является KV-хранилищем, вполне логично хранить словарь в Consul. Например, словарем может быть документ вида:

{
  "format_version": 1,
  "vbuckets": [
    "cluster1",
    "cluster2",
    ...
    "clusterN"
  ]
}

Здесь format_version нужен на случай изменения формата словаря в будущем. Также нам нужна версия (ревизия) словаря, увеличивающаяся при каждом обновлении словаря. Выше она не приведена, так как в Consul это есть из коробки для всех данных и называется ModifyIndex. Наконец, в массиве vbuckets по i-му индексу хранится имя кластера Stolon, соответствующего i-му vbucket. В случае, если в настоящее время происходит перебалансировка (см далее), вместо одного имени кластера хранится пара ["clusterFrom","clusterTo"] — откуда и куда сейчас переносятся данные.

Вы спросите, зачем так сложно? Во-первых, эта схема очень гибкая. Например, на ранних этапах развития проекта мы можем использовать один репликасет, хранящий все 1024 vbucket’а. В будущем мы можем использовать до 1024-х репликасетов. Если каждый репликасет будет хранить 1 Тб данных (далеко не предел по сегодняшним меркам), это обеспечит хранение петабайта данных во всем кластере. Во-вторых, при добавлении новых репликасетов не возникает необходимости перемещать вообще все данные туда-сюда, как это происходит, например, при использовании формулы hash(key) % num_replicasets. Наконец, мощности машин в кластере могут различаться. Эта схема позволяет распределить данные по ним неравномерно, в соответствии с мощностями.

Перебалансировка

Что делать в случае, если мы хотим переместить vbucket с одного репликасета на другой?

Начнем со словаря. Каким образом он будет изменяться при перебалансировке, было описано выше. Но перед началом переноса vbucket’ов важно убедиться, что все клиенты увидели новый словарь, в котором отражен процесс переноса. Каким образом это можно сделать? Простейшее решение заключается в том, что для каждой версии словаря раз в определенный интервал времени T (скажем, 15 секунд) клиенты пишут в Consul «словарь такой-то версии последний раз использовался тогда-то». Само собой разумеется, предполагается, что время между клиентами более-менее синхронизировано с помощью ntpd. Если словарь никем не используется уже T*2 времени, можно смело полагать, что все клиенты увидели новую версию словаря. Сами клиенты могут запрашивать последнюю версию словаря просто время от времени, или же воспользоваться механизмом Consul подписки на изменения данных.

Итак, все клиенты знают о начале перебалансировки. Далее возможны варианты.

  1. Только чтение. Проще всего запретить изменение переносимых данных. Клиенты могут читать с clusterFrom, но не могут ничего записывать. Для многих проектов такое решение вполне подходит. Например, для переноса можно выбрать время, когда системой пользуется меньше всего людей (4 часа ночи) и тем немногим, что попытаются что-то записать, честно сказать, что у проекта сейчас maintenance. Если vbucket’ы достаточно маленькие, перенос все равно займет лишь несколько минут, после чего пользователь сможет повторить запрос.
  2. Все данные неизменяемые. На первый взгляд это кажется странным, но многим реальным проектам (социальные сети, почта, IM, и прочее) на самом деле не очень-то нужны изменяемые данные. Все данные можно представить в виде лога событий. Новые данные добавляются с помощью обычного insert. В тех редких случаях, когда нужно что-то изменить, update делается через вставку новой версии данных, а delete — через вставку специальной метки, что данные больше не существуют. При таком подходе при переносе данных можно писать в clusterTo, а читать из clusterFrom и clusterTo. У этого подхода есть и ряд других преимуществ — простота синхронизации мобильных клиентов, версионность всех данных (важно в финансах), предсказуемая производительность (не нужен autovacuum), и прочие.
  3. Логическая репликация между clusterFrom и clusterTo. Ждем полной синхронизации, работая только с clusterFrom. Затем переключаемся на clusterTo и работаем только с ним. Это наиболее универсальное решение, но есть нюансы. Например, в случае с PostgreSQL логическая репликация — это очень болезненная тема. Наиболее многообещающим решением здесь является pglogical, но у него есть ряд существенных ограничений. Кроме того, последний раз, когда я пробовал pglogical, мне попросту не удалось заставить его работать. Вероятно, вам придется написать свою собственную логическую репликацию. Но это сложно и затратно по времени, так как следует корректно обрабатывать несколько транзакций, выполняющихся параллельно, откаты транзакций, и прочее в таком духе.
  4. Смешанный подход, то есть, использование (1), (2) или (3) на выбор администратора, запустившего перенос. Или же использование разных подходов для разных данных в базе.

Дополнение: Вместо pglogical вы, вероятно, захотите использовать логическую репликацию, которая начиная с PostgreSQL 10 теперь есть из коробки.

В случаях (1) и (2) данные можно переносить обычным pg_dump или воспользоваться COPY:

-- экспорт таблицы
COPY tname TO PROGRAM 'gzip > /tmp/data.gz';
-- экспорт данных по запросу
COPY (SELECT * FROM tname WHERE ...) TO PROGRAM 'gzip > /tmp/data.gz';
-- импорт данных
COPY tname FROM PROGRAM 'zcat /tmp/data.gz';

Следует также отметить, что вместо логической репликации можно использовать обычную потоковую. Для этого нужно, чтобы каждый vbucket жил на отдельном инстансе СУБД. В частности, PostgreSQL позволяет легко запускать много инстансов на одной физической машине безо всякой виртуализации. В этом случае вы, вероятно, захотите выбрать несколько меньшее число vbuckets, чем предложенные ранее 1024. Еще, как альтернативный вариант, можно реплицировать вообще все данные, а потом удалять лишние. Но это дорого и будет работать только при введении в строй совершенного нового репликасета.

На мой взгляд, наиболее правдоподобным и универсальным вариантом на сегодняшний день является использование потоковой репликации с удалением лишних данных по окончании репликации по сценарию (3). Это работает только при добавлении совершенно нового, пустого репликасета. В случае, если данные нужно слить с нескольких репликасетов в один, следует использовать pg_dump по сценарию (1).

Решардинг

Напомню, что решардингом, в отличие от перебалансировки, называется изменение числа vbucket’ов или же полное изменение схемы шардирования. Последнее по моим представлениям является очень сложной задачей, делается крайне редко, и исключительно в случае, если весь шардинг реализован непосредственно в самом приложении, а не на стороне СУБД или какого-то middleware перед ним. Здесь очень многое зависит от конкретной ситуации, поэтому далее мы будем говорить о решардинге только в контексте изменения числа vbucket’ов.

Простейший подход к решардингу — это не поддерживать его и просто заранее выбирать достаточно большее количество vbucket’ов :)

Если же решардинг все-таки требуется поддерживать, многое зависит от того, что было выбрано за единицу данных (см параграф про шардинг). Допустим, ей является строка в таблице. Тогда мы можем очень просто удвоить количество vbucket’ов. Вспомним формулу:

// было
vbucket_num = hash(key) % 1024 [ = hash(key) & 0x3FF ]
// стало
vbucket_num = hash(key) % 2048 [ = hash(key) & 0x7FF ]

После удвоения числа vbucket’ов половина ключей будут соответствовать все тому же номеру бакета, от 0 до 1023. Еще половина ключей будет перенесена в бакеты с 1024 по 2047. Притом ключ, ранее принадлежавший бакету 0, попадет в бакет 1024, ранее принадлежавший бакету 1 — в бакет 1025, и так далее (у номера бакета появится дополнительный старший бит, равный единице). Это означает, что если мы возьмем текущий словарь, и модифицируем его так:

// оператор ++ означает операцию append, присоединение массива с конца
dict.vbuckets = dict.vbuckets ++ dict.vbuckets

… то все ключи автоматически окажутся на нужных репликасетах без какого-либо переноса. Теперь, когда число vbucket’ов удвоилось, бакеты можно переносить с репликасета на репликасет, как обычно. Уменьшение числа vbucket’ов происходит аналогично в обратном порядке — сначала серия переносов, затем обновление словаря. Как и в случае с перебалансировкой, следует проверять, что все клиенты увидели новую версию словаря.

Если единицами данных являются базы данных с одинаковой схемой, все несколько сложнее. В этом случае не очень понятно, как можно быстро и правильно для общего случая разделить каждую базу на две. Похоже, лучшее, что можно сделать при такой схемы шардирования, вместо использования формулы hash(key) % 1024 просто сообщать пользователю количество vbucket’ов. В этом случае определение номера бакета по ключу, а также перенос данных в случае решардинга перекладываются на приложение. Зато число бакетов может в любой момент быть увеличено на произвольное число просто путем создания пустых бакетов. Или уменьшено путем удаления лишних бакетов, в предположении, что пользователь заблаговременно перенес из них все данные.

Распределенные транзакции

Поскольку бакеты могут быть логически связанными и храниться на разных репликасетах, иногда приходится делать транзакции между репликасетами. При правильно выбранной схеме шардирования распределенные транзакции должны выполняться редко, поскольку они всегда недешевы. Если в вашем проекте распределенные транзакции не нужно делать никогда, вам сильно повезло.

Как всегда, в зависимости от ситуации задачу можно решить разными способами. Допустим, вы решили воспользоваться описанной выше идеей с неизменяемыми данными, и каждый пользователь в вашем проекте читает данные только из своего бакета. В этом случае «транзакцию» между бакетами А и Б можно выполнить по предельно простому алгоритму:

  1. Создайте объект «транзакция», хранящий все, что вы хотите записать в бакеты А и Б.
  2. Произведите запись в бакет А. Запись должна производиться в одну локальную транзакцию, а у записанных объектов должна быть метка, к какой транзакции они относятся. Если объекты с соответствующей меткой уже записаны, ничего не делать.
  3. Аналогичным образом произведите запись в бакет Б.
  4. Удалите объект «транзакция»;

Шаги (2) и (3) могут выполняться параллельно. Если выполнение кода прервется на шаге (2), (3) или (4), «транзакцию» всегда можно будет докатить (специально предусмотренным для этого процессом). Это возможно по той причине, что операции (2) и (3) идемпотентны — их повторное выполнение приводит к тому же результату. При этом, поскольку пользователь читает данные только из своего бакета, с его точки зрения данные всегда консистентны.

Само собой разумеется, это не настоящие транзакции, но для многих проектов их будет более, чем достаточно. При определенных условиях этот подход можно применить даже в случае, если данные в бакетах изменяемые.

Описание более универсального подхода можно найти в блоге Дениса Рысцова. Также этот прием описан как минимум в документации к MongoDB и блоге CockroachDB. В общих чертах алгоритм примерно такой:

  1. Создайте объект «транзакция» с состоянием committed = false, aborted = false.
  2. При обращении к объектам в вашей базе данных на чтение и на запись указывайте в них ссылку на транзакцию. При обращении на запись в специальное поле допишите, каким станет объект в случае, если транзакция завершится успешно (локальные изменения). Если у объекта уже есть метка, и соответствующая транзакция:
    • … закоммичена, примените локальные изменения объекта и запишите свою метку.
    • … отменена, очистите локальные изменения и запишите свою метку.
    • … все еще выполняется, значит произошел конфликт. Вы можете подождать, отменить свою транзакцию, или отменить чужую транзакцию. Примите во внимание, что процесс, выполнявший другую транзакцию, мог уже умереть. Так что, как минимум при определенных условиях вы должны отменять ту, другую транзакцию (например, если она начала выполнение достаточно давно). Иначе затронутые ею объекты останутся заблокированы навсегда.
  3. Если транзакция все еще не прибита другими процессами, измените ее состояние на committed = true. Это ключевой момент алгоритма. Так как этот шаг выполняется атомарно, и все транзакции знают, как трактовать локальные изменения, у транзакции нет никаких промежуточных состояний. В любой момент времени она либо применена, либо нет.
  4. Почистите за собой — примените локальные изменения ко всем затронутым объектам, затем удалите объект «транзакция». Этот шаг не обязателен в смысле корректности алгоритма. Он просто освобождает место на диске от данных, которые стали ненужны.

Важно! Приведенное описание предполагает, что каждая операция чтения или записи выполняется в отдельной транзакции при уровне изоляции serializable. Или, в более общем случае, если СУБД не поддерживает транзакций, в одну CAS-операцию. Однако выполнение нескольких операций в одной транзакции не влияет на корректность алгоритма.

Этот алгоритм довольно неприятно применять по той причине, что абсолютно все транзакции, включая локальные, должны понимать, как трактовать локальные изменения. Алгоритм обеспечивает уровень изоляции repeatable read. Это уровень изоляции менее строгий, чем snapshot isolation и serializable, и на нем возможны некоторые аномалии (phantom read, write skew). Тем не менее, он подходит для многих приложений, если знать об его ограничениях.

Хочу еще раз подчеркнуть важность проставления метки транзакции на шаге (2) не только при записи, но и при чтении. Если этого не делать, другая транзакция может изменить объект, который вы читаете, и при повторном его прочтении вы увидите что-то другое. Если вы точно знаете, что не станете ничего писать в него, то можете просто закэшировать объект в памяти.

Заключение

Горизонтальное масштабирование РСУБД — задача решаемая. Несмотря на сложность некоторых описанных выше моментов, это ничто по сравнению со сложностями, которые вас ждут при использовании в проекте исключительно NoSQL решений. В частности, для обеспечения какой-либо консистентности придется делать распределенные транзакции, как было описано выше, практически на все.

Как вы могли убедиться, тут довольно сложно представить универсальное решение, подходящее абсолютно всем и всегда. И это мы еще упустили из виду, например, такие важные вопросы, как репликация между несколькими ДЦ и снятие консистентных бэкапов с множества репликасетов! Именно ввиду существования огромного количества возможных решений мы не рассматривали вопрос автоматизации всего описанного выше.

Дополнение: В этом контексте вас также может заинтересовать статья Поднимаем кластер CockroachDB из трех нод.

Метки: , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.