Добавляем интроспекцию в Akka при помощи 100 строк кода

16 ноября 2015

В холиварах на тему Erlang/OTP против Scala/Akka сторонники Erlang часто используют аргумент, что дескать в Akka нет интроспекции. Имеется в виду, что нельзя получить текущее состояние актора, нет аналога etop или Observer, и вот это все. Это правда, сама по себе Akka таких инструментов не предоставляет. Но, оказывается, на практике их можно очень просто реализовать своими силами.

Не будем ходить вокруг да около. Вот код:

// Этот трейт подмешивается во всем проекте там, где используется ask
trait AskHelper {
  implicit val timeout = Timeout(10.seconds)
}

case object Introspection extends AskHelper {
  case class QueueEntry(msg: Any, sender: ActorRef)
  case class ActorState(initDone: Boolean, queue: Queue[QueueEntry],
                        currentMessage: Option[Any])

  private case object GetActorState
  private case object ManualRestart
  private case object ManualStop

  def getActorState(ref: ActorRef): Future[ActorState] = {
    (ref ? GetActorState).mapTo[ActorState]
  }

  def manualRestart(ref: ActorRef): Future[Unit] = {
    (ref ? ManualRestart).mapTo[Unit]
  }

  def manualStop(ref: ActorRef): Future[Unit] = {
    (ref ? ManualStop).mapTo[Unit]
  }

  // TODO: getRunningActors, сделать свой реестр работающих акторов
}

trait Introspection extends Actor with ActorLogging {
  import Introspection._

  private case object Ready
  private case object Initializing

  private var queue = Queue[QueueEntry]()

  private var currentMessage: Option[Any] = Some(Initializing)

  protected def init: Future[Unit] = Future.successful({})

  protected def initialized: PartialFunction[Any, Future[Any]]

  protected def currentQueueSize(): Int = {
    queue.length
  }

  override def preStart(): Unit = {
    wait(Initializing, init)
  }

  override def receive: Receive = {
    case GetActorState => sender ! currentState()
    // see http://doc.akka.io/docs/akka/snapshot/scala/
    //     fault-tolerance.html#Default_Supervisor_Strategy
    case ManualRestart =>
      sender ! {}
      throw new RuntimeException("Manual restart")
    case ManualStop =>
      sender ! {}
      self ! Kill
  }

  private def waiting: Receive = receive orElse {
    case Ready =>
      currentMessage = None

      if(queue.isEmpty) {
        context.become(ready, discardOld = true)
      } else {
        val (stashedMsg, newQueue) = queue.dequeue
        queue = newQueue
        self.tell(stashedMsg, stashedMsg.sender)
      }

    case QueueEntry(msg, _) =>
      wait(msg, initialized(msg))

    case msg if initialized.isDefinedAt(msg) =>
      queue = queue.enqueue(QueueEntry(msg, sender()))
  }

  private def ready: Receive = receive orElse {
    case msg if initialized.isDefinedAt(msg) =>
      log.debug(s"processing msg: $msg")
      wait(msg, initialized(msg))
  }

  private def wait(msg: Any, fLazyWaitFor: => Future[Any]): Unit = {
    currentMessage = Some(msg)

    val selfRef = self

    val fWaitFor = Try(fLazyWaitFor).recover { case err =>
                     Future.failed(err)
                   }.get
    // ^ keep this line, init and initialized methods
    //   can throw exceptions

    context.become(waiting, discardOld = true)
    fWaitFor.onSuccess{ case _ => selfRef ! Ready }
    fWaitFor.onFailure{ case _ => selfRef ! PoisonPill }
  }

  private def currentState(): ActorState = {
    val initDone = !currentMessage.contains(Initializing)
    ActorState(initDone, queue, currentMessage)
  }
}

Идея следующая. Все акторы в проекте должны реализовывать следующие методы:

// Инициализация актора
def init: Future[Unit] = Future.successful({})

// Обработка сообщения
def initialized: PartialFunction[Any, Future[Any]]

Актор может находится в двух состояниях — ready и waiting. Когда актор находится в состоянии ready, при приходе нового сообщения делается вызов метода initialized, после чего актор переходит в состояние waiting. Пока актор находится в этом состоянии, все новые сообщения помещаются в queue. Кроме того, есть «системные» сообщения, позволяющие получить текущее состояние актора (например, содержимое очереди) независимо от того, находится ли актор в waiting или ready. При завершении футуры, полученной при вызове initialized, актор снова переходит в состояние ready. При этом делается проверка, нет ли в queue еще не обработанных сообщений. Если есть — извлекаем сообщение, снова переходим в waiting.

При этом мы по сути переходим от семантики Akka, когда сообщения могут обрабатываться параллельно в разных футурах, к семантике Erlang, где сообщения обрабатываются одно за другим. В итоге убиваются сразу два зайца. Добавляется интроспекция и становится на порядок меньше гонок.

Собственно, это все! При желании можно легко прикрутить реестр с именами всех работающих в данный момент акторов, добавить HTTP-ручку, позволяющую прибивать и рестартовать акторы, получать top N акторов с самыми длинными очередями, можно настроить Nagios так, чтобы он орал, если очередь одного из акторов становится длиннее X, и так далее. Тут уже многое начинает зависеть от проекта, например, используется ли в нем Finagle или Akka HTTP, есть ли в коде какие-то исключения, которые нужно обрабатывать особым образом, и так далее. Плюс разные программисты хотели бы получать разную информацию об акторах, и в разных форматах (JSON, XML, …).

Полная версия исходного кода к этому посту доступна на GitHub. Вдумчивые читатели могут попробовать ответить на вопрос, почему в трейте Introspection используется контейнер Queue, а не готовый трейт Stash, доступный в Akka. Как всегда, любые комментарии приветствуются.

Дополнение: Dependency injection в Scala с помощью библиотеки SubCut

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.