Интересный пример с роутингом и кэшами в Akka Cluster

27 апреля 2015

Во многих серверных приложениях часто возникает необходимость кэшировать какие-то данные, так как количество тактов процессора ограничено, хождение в базу по сети занимает ощутимое время, да и пулы коннектов не резиновые. Сделали кэши, все хорошо. Но как только ваше приложение начинает работать более, чем на одном сервере, возникает проблема инвалидации кэшей. Когда пользователь пишет какие-то данные через один сервер, требуется обновлять или сбрасывать кэши на всех остальных серверах. Отчасти проблему позволяет решить Memcached, но с ним другие проблемы — (1) неизбежные накладные расходы на хождение по сети и (2) не всегда понятно, как сделать так, чтобы данные в кэше не разъехались в граничных случаях типа нетсплитов и поднятия новых кэш-серверов. Оказывается, что Akka Cluster из коробки предлагает довольно интересное решение озвученных проблем.

Допустим, мы решили закэшировать профили пользователей. Так как мы используем Akka Cluster, узлы приложения могут спокойно обращаться с запросами друг к другу, подписываться на происходящие в кластере события, вот это все. А давайте каждый узел будет кэшировать профили определенных пользователей. За каким профилем в какой узел ходить будет определяться по sharding key, в качестве которого для примера возьмем user id. Таким образом мы автоматически решаем проблему инвалидации кэшей, так как кэш по каждому ключу хранится в одном месте. Многие запросы узлы будут посылать сами себе, избегая тем самым накладных расходов на хождение по сети. Так как мы используем акторы, все запросы к кэшам, как на чтение, так и на запись, происходят последовательно, благодаря чему решается проблема получения неконсистентных данных из-за параллельной записи. А если один из узлов упадет? Кластер быстро узнает об этом и кэши будут перераспределены! Разумеется, остается несколько тонких моментов, но о них чуть ниже.

Все описанное достигается с помощью Akka Cluster на удивление просто. Далее предполагается, что в приложении уже используется Akka Cluster, а также класс ClusterListener. Как это делается было рассказано в предыдущих двух заметках, посвященных Akka Cluster. Далее открываем application.conf и в секцию akka.actor дописываем:

deployment {
  /profileManager/router {
    router = consistent-hashing-group
    nr-of-instances = 16
    routees.paths = ["/user/profileManager"]
    cluster {
      enabled = on
      allow-local-routees = on
    }
  }
}

Это описание consistent hashing роутера. Подробнее о роутерах можно узнать из официальной документации по Akka. Видов роутеров в Akka очень много, помимо consistent hashing есть еще и random, round robin, broadcast и многие другие стратегии. Кроме того, при желании можно написать свою стратегию.

Чтобы сообщение передавалось через роутер правильному узлу, сообщения должны реализовывать трейт ConsistentHashable, присваивая полю consistentHashKey значение, по которому должен осуществляться роутинг. Мы введем небольшой вспомогательный «маршрутезируемый» класс для оборачивания всех других сообщений перед передачей через роутер:

import akka.routing.ConsistentHashingRouter.ConsistentHashable

case class RoutedMsg[T](key: T, msg: Any) extends ConsistentHashable {
  val consistentHashKey = key
}

Кроме того, для удобства введем еще один очень простой трейт:

trait RoutedMsgWithId {
  val id: Long
}

Информация о профиле, которую будем хранить в кэше:

case class ProfileInfo(uid: Long, username: String, email: String)

Работающие локально акторы ничего не знают о том, где находятся кэши. Поэтому введем новую сущность — ProfileManager:

object ProfileManager {
  val name = "profileManager"

  def props() = Props[ProfileManager]
}

class ProfileManager extends Actor with ActorLogging {
  val managerRouter = context.actorOf(
                        Props.empty.withRouter(FromConfig),
                        "router"
                      )

  override def receive = LoggingReceive {
    // сообщение от самого себя или другого менеджера
    case r@RoutedMsg(uid: Long, msg: Any) =>
      val actorName = s"profile-$uid"
      context.child(actorName) getOrElse {
        context.actorOf(ProfileActor.props(uid), actorName)
      } forward msg

    // сообщение с текущей ноды
    case msg: RoutedMsgWithId =>
      managerRouter forward RoutedMsg(msg.id, msg)
  }
}

Это актор, экземпляр которого локально запущен на каждом узле в кластере. Локальные акторы посылают менеджеру запросы к кэш-акторам. Запросы должны реализовывать трейт RoutedMsgWithId. Менеджер оборачивает запросы в RoutedMsg и отдает роутеру. Роутер смотрит на consistentHashKey сообщения и пересылает его соответствующему менеджеру. Это может быть как менеджер на удаленном узле, так и тот же самый менеджер, что передал сообщение роутеру. При получении RoutedMsg (от другого узла или локального) менеджер извлекает из него оригинальное сообщение, запускает локальный актор-кэш, если он еще не был запущен, и пересылает сообщение ему.

Реализация самого кэш-актора довольно тривиальна:

object ProfileActor {

  def props(uid: Long) = Props(new ProfileActor(uid))

  case class GetProfile(id: Long) extends RoutedMsgWithId

  case class AskExt(mngRef: ActorRef) {
    def getProfile(uid: Long) = {
      (mngRef ? GetProfile(uid)).mapTo[ProfileInfo]
    }
  }
}

class ProfileActor(uid: Long) extends Actor with ActorLogging {
  import me.eax.akka_examples.profile.ProfileActor._

  private val actorLifetime = 1.minute // TODO: read from config

  override def preStart() {
    println(s"ProfileActor($uid): started!")
    context.system.scheduler.scheduleOnce(
      actorLifetime, self, PoisonPill
    )
  }

  override def postStop() {
    println(s"ProfileActor($uid): stopped!")
  }

  override def receive = LoggingReceive { case req =>
    println(
      s"ProfileActor($uid): Request received: $req from " +
      sender().path.address
    )

    req match {
      case r: GetProfile =>
        sender ! ProfileInfo(uid, s"user$uid", s"user$uid@gmail.com")
    }
  }
}

В приведенной коде актор автоматически убивает сам себя после одной минуты жизни. Это сделано по той причине, что профиль пользователя может быть уже никому не интересен, а память, как правило, не резиновая. Альтернативный вариант убийства кэш-актора заключается в использовании receive timeout, но с этим подходом нужно быть осторожным, так как в общем случае актор может время от времени посылать сам себе сообщения, например, для обновления кэша. В этом случае receive timeout может никогда ему не прийти. Более интересная стратегия заключается в том, чтобы убивать кэши в ProfileManager при достижении определенного их количества, таким образом избегая совершенно ненужных повторных инициализаций кэшей.

Наконец, связывается все описанное выше примерно так:

val profileManager = {
  ProfileActor.AskExt(
    system.actorOf(ProfileManager.props(), ProfileManager.name)
  )
}
val id = Random.nextInt(10).toLong
profileManager.getProfile(id)

Полную версию исходного кода к данному посту вы найдете в этом репозитории. Можете собрать кластер из трех узлов по аналогии с тем, как мы это делали в предыдущих заметках, и убедиться, что запросы распределяются по разным узлам в зависимости от user id.

Еще пара нюансов, о которых нужно помнить. Во-первых, в кластерах случаются нетсплиты и падения машин, притом узлы кластера узнают о них не сразу. Таким образом, кэши могут быть какое-то время недоступны и, что намного важнее, узлы могут в течение какого-то времени искать кэши на разных узлах, в результате чего можно получить неконсистентное состояние в кэше. Решать это можно по-разному в зависимости от специфики вашего приложения. Можно просто забить, например, если данные в кэшах неизменяемые, можно отклонять запросы к кэшу первые N секунд после его создания, и так далее. Во-вторых, в полной версии кода вы найдете два типа кэшей — кэши сессий и кэши профилей. И вы можете легко убедиться, что кэш сессии и профиля одного и того же пользователя может оказаться на разных узлах кластера. Это может быть или не быть проблемой в зависимости, опять таки, от специфики вашего приложения. В общем случае я бы советовал группировать разные типы данных с одинаковым ключом, так как в ряде случаев это позволяет уменьшить число запросов и объем трафика между узлами, реализовать атомарное обновление профиля и сессий, и так далее. Сделать группировку очень просто, используя один актор-менеджер (и, возможно, один актор-кэш) на несколько типов данных вместо двух. Наконец, в-третьих, не стоит забывать о проблеме холодных кэшей. Решается она по-разному, от превращения акторов-кэшей в прокси к Memcached, до периодического сохранения кэшей в S3 и восстановления оттуда при старте приложения.

Как всегда, я буду рад любым вашим дополнениям, вопросам, а также просто мыслям по теме роутинга и кэширования.

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.