Пример работы с шедулером в Akka

10 декабря 2014

Иногда требуется запланировать выполнение кода на определенное время после происшествия какого-то события. Или сказать, что некий код выполняется снова и снова с заданным интервалом времени. Если в вашем проекте используется Akka, то вы можете решать такие задачи просто элементарно… на самом деле нет. Как мы скоро убедимся, даже здесь есть определенные тонкости.

Примечание: Не путайте шедулер в Akka с шедулерами в Erlang. То, что в Erlang называется шедулерами и распределяют время между акторами, в терминах Akka называется диспетчерами. Здесь же мы говорим об аналоге таймеров из Erlang’а.

Рассмотрим следующий код:

package me.eax.akka_examples

import akka.actor._
import akka.event._
import akka.pattern.pipe
import com.typesafe.config.ConfigFactory

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

import java.util.concurrent.atomic.AtomicInteger

object Database {
  val counter = new AtomicInteger(0)

  def fakeRequest(): Future[Int] = {
    val result = counter.incrementAndGet()
    Future {
      if(result == 3) throw new RuntimeException("Fake exception")
      result
    }
  }
}

case object PullCounter
case class PullResult(counter: Int)
case object PullFailed

class PullActor extends Actor {
  val period = 2.seconds
  var timerCancellable: Option[Cancellable] = None

  def scheduleTimer() = {
    timerCancellable = Some(
      context.system.scheduler.scheduleOnce(
        period, context.self, PullCounter
      )
    )
  }

  override def preStart() = scheduleTimer()

  // so we don't call preStart and schedule a new message
  // see http://doc.akka.io/docs/akka/2.2.4/scala/howto.html
  override def postRestart(reason: Throwable) = {}

  def receive = LoggingReceive {
    case PullCounter =>
      val fReq = Database.fakeRequest()
      fReq.map(counter => PullResult(counter)) pipeTo self
      fReq.onFailure{ case _ => self ! PullFailed }

    case PullFailed =>
      scheduleTimer()

    case r: PullResult =>
      if(r.counter >= 5) {
        context.system.shutdown()
      } else {
        scheduleTimer()
      }
  }
}

object Example4 extends App {
  val config = ConfigFactory.parseString(
    """
    akka.loglevel = "DEBUG"
    akka.actor.debug {
      receive = on
      lifecycle = on
    }
    """
)

  val system = ActorSystem("system", config)
  system.actorOf(Props[PullActor], "pullActor")
  system.awaitTermination()
}

Здесь мы как бы имеем довольно типичную ситуацию. Есть какой-то внешний источник данных, база данных, REST-сервис или вроде того. Ходить в него дорого и долго, да и сервис может оказаться временно недоступен. Поэтому мы заводим актор, который время от времени ходит за данными и кэширует их. В приведенном примере роль источника данных выполняет такая заглушка:

object Database {
  val counter = new AtomicInteger(0)

  def fakeRequest(): Future[Int] = {
    val result = counter.incrementAndGet()
    Future {
      if(result == 3) throw new RuntimeException("Fake exception")
      result
    }
  }
}

На самом деле, нам не важно, что является источником, лишь бы при хождении в него использовались футуры. Как видите, футура из нашей заглушки не всегда завершается успешно.

При старте актор обращается к шедулеру и говорит «пошли мне сообщение PullCounter через две секунды»:

  val period = 2.seconds
  var timerCancellable: Option[Cancellable] = None

  def scheduleTimer() = {
    timerCancellable = Some(
      context.system.scheduler.scheduleOnce(
        period, context.self, PullCounter
      )
    )
  }

  override def preStart() = scheduleTimer()

Здесь мы не используем переменную timerCancellable но в общем случае с ее помощью посылку сообщения можно попытаться отменить:

timerCancellable.foreach(_.cancel())

При получении PullCounter актор осуществляет асинхронное хождение в «базу»:

    case PullCounter =>
      val fReq = Database.fakeRequest()
      fReq.map(counter => PullResult(counter)) pipeTo self
      fReq.onFailure{ case _ => self ! PullFailed }

При получении ответа, обернутого в PullResult, актор планирует посылку нового сообщения PullCounter (заметьте, кусок про system.shutdown() нужен просто чтобы когда-нибудь остановить пример):

    case r: PullResult =>
      if(r.counter >= 5) {
        context.system.shutdown()
      } else {
        scheduleTimer()
      }

Если же хождение в базу завершается неуспешно, срабатывает onFailure и актор получает сообщение PullFailed:

    case PullFailed =>
      scheduleTimer()

Это очень важный момент, так как если бы мы его упустили, в случае ошибки актор перестал бы ходить в базу.

Следует отметить, что здесь мы использовали метод sheduleOnce и вручную планировали следующую посылку сообщения. Шедулер также имеет более простой метод shedule, позволяющий выполнять посылку сообщений регулярно без повторного планирования. Но в данном случае использовать его — плохая идея. Дело в том, что хождение в базу может занять больше заданного нами period’а в две секунды. В этом случае сообщения от шедулера начнут копиться в очереди актора, а в базу данных будет одновременно посылаться множество запросов, так как новый запрос будет создаваться до завершения предыдущего. Как результат, база ляжет, а очередь переполнится.

Если же в вашем приложении требуется именно shedule, соответствующий пример вы найдете здесь. Однако примите во внимание, что использование shedule не гарантирует вам регулярного получения заданного сообщения раз в N единиц времени. Во-первых, потому что при рестарте актора (например, из-за эксепшена) посылку сообщений придется запланировать снова, что как бы собьет весь ритм. Во-вторых, дэфолтная реализация шедулера проверяет, не пора ли послать какие-то сообщения, раз в некоторый интервал времени (который можно поменять в конфиге), поэтому особой пунктуальности здесь ожидать не следует.

И кстати, раз уж мы вспомнили про эксепшены, обратите внимание на код:

  override def postRestart(reason: Throwable) = {}

Он нужен для того, чтобы при перезапуске актора повторно не вызвался preStart. В зависимости от конкретного случая вы должны или не должны этого хотеть. Как следует обдумайте конкретную ситуацию и протестируйте все граничные случаи.

Вывод программы (лишние детали опущены):

[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(1)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(2)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullFailed
[akka://system/user/pullActor] received unhandled message Failure(...
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(4)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(5)
[akka://system/user] stopping
[akka://system/user/pullActor] stopped
[akka://system/user] stopped

Надеюсь, вы заметили, как ловко мы избавились от необходимости писать логи вручную, воспользовавшись LoggingReceive и правильно настроив Akka?

Как видите, при использовании шедулера приходится учитывать массу деталей. Не расслабляйтесь!

Дополнение: Делаем метрики и мониторинг для Akka при помощи Kamon

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.