← На главную

Пример работы с шедулером в Akka

Иногда требуется запланировать выполнение кода на определенное время после происшествия какого-то события. Или сказать, что некий код выполняется снова и снова с заданным интервалом времени. Если в вашем проекте используется Akka, то вы можете решать такие задачи просто элементарно… на самом деле нет. Как мы скоро убедимся, даже здесь есть определенные тонкости.

Примечание: Не путайте шедулер в Akka с шедулерами в Erlang. То, что в Erlang называется шедулерами и распределяют время между акторами, в терминах Akka называется диспетчерами. Здесь же мы говорим об аналоге таймеров из Erlang’а.

Рассмотрим следующий код:

package me.eax.akka_examples import akka.actor._ import akka.event._ import akka.pattern.pipe import com.typesafe.config.ConfigFactory import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global import java.util.concurrent.atomic.AtomicInteger object Database { val counter = new AtomicInteger(0) def fakeRequest(): Future[Int] = { val result = counter.incrementAndGet() Future { if(result == 3) throw new RuntimeException("Fake exception") result } } } case object PullCounter case class PullResult(counter: Int) case object PullFailed class PullActor extends Actor { val period = 2.seconds var timerCancellable: Option[Cancellable] = None def scheduleTimer() = { timerCancellable = Some( context.system.scheduler.scheduleOnce( period, context.self, PullCounter ) ) } override def preStart() = scheduleTimer() // so we don't call preStart and schedule a new message // see http://doc.akka.io/docs/akka/2.2.4/scala/howto.html override def postRestart(reason: Throwable) = {} def receive = LoggingReceive { case PullCounter => val fReq = Database.fakeRequest() fReq.map(counter => PullResult(counter)) pipeTo self fReq.onFailure{ case _ => self ! PullFailed } case PullFailed => scheduleTimer() case r: PullResult => if(r.counter >= 5) { context.system.shutdown() } else { scheduleTimer() } } } object Example4 extends App { val config = ConfigFactory.parseString( """ akka.loglevel = "DEBUG" akka.actor.debug { receive = on lifecycle = on } """) val system = ActorSystem("system", config) system.actorOf(Props[PullActor], "pullActor") system.awaitTermination() }

Здесь мы как бы имеем довольно типичную ситуацию. Есть какой-то внешний источник данных, база данных, REST-сервис или вроде того. Ходить в него дорого и долго, да и сервис может оказаться временно недоступен. Поэтому мы заводим актор, который время от времени ходит за данными и кэширует их. В приведенном примере роль источника данных выполняет такая заглушка:

object Database { val counter = new AtomicInteger(0) def fakeRequest(): Future[Int] = { val result = counter.incrementAndGet() Future { if(result == 3) throw new RuntimeException("Fake exception") result } } }

На самом деле, нам не важно, что является источником, лишь бы при хождении в него использовались футуры. Как видите, футура из нашей заглушки не всегда завершается успешно.

При старте актор обращается к шедулеру и говорит «пошли мне сообщение PullCounter через две секунды»:

val period = 2.seconds var timerCancellable: Option[Cancellable] = None def scheduleTimer() = { timerCancellable = Some( context.system.scheduler.scheduleOnce( period, context.self, PullCounter ) ) } override def preStart() = scheduleTimer()

Здесь мы не используем переменную timerCancellable но в общем случае с ее помощью посылку сообщения можно попытаться отменить:

timerCancellable.foreach(_.cancel())

При получении PullCounter актор осуществляет асинхронное хождение в «базу»:

case PullCounter => val fReq = Database.fakeRequest() fReq.map(counter => PullResult(counter)) pipeTo self fReq.onFailure{ case _ => self ! PullFailed }

При получении ответа, обернутого в PullResult, актор планирует посылку нового сообщения PullCounter (заметьте, кусок про system.shutdown() нужен просто чтобы когда-нибудь остановить пример):

case r: PullResult => if(r.counter >= 5) { context.system.shutdown() } else { scheduleTimer() }

Если же хождение в базу завершается неуспешно, срабатывает onFailure и актор получает сообщение PullFailed:

case PullFailed => scheduleTimer()

Это очень важный момент, так как если бы мы его упустили, в случае ошибки актор перестал бы ходить в базу.

Следует отметить, что здесь мы использовали метод sheduleOnce и вручную планировали следующую посылку сообщения. Шедулер также имеет более простой метод shedule, позволяющий выполнять посылку сообщений регулярно без повторного планирования. Но в данном случае использовать его – плохая идея. Дело в том, что хождение в базу может занять больше заданного нами period’а в две секунды. В этом случае сообщения от шедулера начнут копиться в очереди актора, а в базу данных будет одновременно посылаться множество запросов, так как новый запрос будет создаваться до завершения предыдущего. Как результат, база ляжет, а очередь переполнится.

Если же в вашем приложении требуется именно shedule, соответствующий пример вы найдете здесь. Однако примите во внимание, что использование shedule не гарантирует вам регулярного получения заданного сообщения раз в N единиц времени. Во-первых, потому что при рестарте актора (например, из-за эксепшена) посылку сообщений придется запланировать снова, что как бы собьет весь ритм. Во-вторых, дэфолтная реализация шедулера проверяет, не пора ли послать какие-то сообщения, раз в некоторый интервал времени (который можно поменять в конфиге), поэтому особой пунктуальности здесь ожидать не следует.

И кстати, раз уж мы вспомнили про эксепшены, обратите внимание на код:

override def postRestart(reason: Throwable) = {}

Он нужен для того, чтобы при перезапуске актора повторно не вызвался preStart. В зависимости от конкретного случая вы должны или не должны этого хотеть. Как следует обдумайте конкретную ситуацию и протестируйте все граничные случаи.

Вывод программы (лишние детали опущены):

[akka://system/user/pullActor] received handled message PullCounter [akka://system/user/pullActor] received handled message PullResult(1) [akka://system/user/pullActor] received handled message PullCounter [akka://system/user/pullActor] received handled message PullResult(2) [akka://system/user/pullActor] received handled message PullCounter [akka://system/user/pullActor] received handled message PullFailed [akka://system/user/pullActor] received unhandled message Failure(... [akka://system/user/pullActor] received handled message PullCounter [akka://system/user/pullActor] received handled message PullResult(4) [akka://system/user/pullActor] received handled message PullCounter [akka://system/user/pullActor] received handled message PullResult(5) [akka://system/user] stopping [akka://system/user/pullActor] stopped [akka://system/user] stopped

Надеюсь, вы заметили, как ловко мы избавились от необходимости писать логи вручную, воспользовавшись LoggingReceive и правильно настроив Akka?

Как видите, при использовании шедулера приходится учитывать массу деталей. Не расслабляйтесь!

Дополнение: Делаем метрики и мониторинг для Akka при помощи Kamon