Пример работы с шедулером в Akka
10 декабря 2014
Иногда требуется запланировать выполнение кода на определенное время после происшествия какого-то события. Или сказать, что некий код выполняется снова и снова с заданным интервалом времени. Если в вашем проекте используется Akka, то вы можете решать такие задачи просто элементарно… на самом деле нет. Как мы скоро убедимся, даже здесь есть определенные тонкости.
Примечание: Не путайте шедулер в Akka с шедулерами в Erlang. То, что в Erlang называется шедулерами и распределяют время между акторами, в терминах Akka называется диспетчерами. Здесь же мы говорим об аналоге таймеров из Erlang’а.
Рассмотрим следующий код:
import akka.actor._
import akka.event._
import akka.pattern.pipe
import com.typesafe.config.ConfigFactory
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.atomic.AtomicInteger
object Database {
val counter = new AtomicInteger(0)
def fakeRequest(): Future[Int] = {
val result = counter.incrementAndGet()
Future {
if(result == 3) throw new RuntimeException("Fake exception")
result
}
}
}
case object PullCounter
case class PullResult(counter: Int)
case object PullFailed
class PullActor extends Actor {
val period = 2.seconds
var timerCancellable: Option[Cancellable] = None
def scheduleTimer() = {
timerCancellable = Some(
context.system.scheduler.scheduleOnce(
period, context.self, PullCounter
)
)
}
override def preStart() = scheduleTimer()
// so we don't call preStart and schedule a new message
// see http://doc.akka.io/docs/akka/2.2.4/scala/howto.html
override def postRestart(reason: Throwable) = {}
def receive = LoggingReceive {
case PullCounter =>
val fReq = Database.fakeRequest()
fReq.map(counter => PullResult(counter)) pipeTo self
fReq.onFailure{ case _ => self ! PullFailed }
case PullFailed =>
scheduleTimer()
case r: PullResult =>
if(r.counter >= 5) {
context.system.shutdown()
} else {
scheduleTimer()
}
}
}
object Example4 extends App {
val config = ConfigFactory.parseString(
"""
akka.loglevel = "DEBUG"
akka.actor.debug {
receive = on
lifecycle = on
}
""")
val system = ActorSystem("system", config)
system.actorOf(Props[PullActor], "pullActor")
system.awaitTermination()
}
Здесь мы как бы имеем довольно типичную ситуацию. Есть какой-то внешний источник данных, база данных, REST-сервис или вроде того. Ходить в него дорого и долго, да и сервис может оказаться временно недоступен. Поэтому мы заводим актор, который время от времени ходит за данными и кэширует их. В приведенном примере роль источника данных выполняет такая заглушка:
val counter = new AtomicInteger(0)
def fakeRequest(): Future[Int] = {
val result = counter.incrementAndGet()
Future {
if(result == 3) throw new RuntimeException("Fake exception")
result
}
}
}
На самом деле, нам не важно, что является источником, лишь бы при хождении в него использовались футуры. Как видите, футура из нашей заглушки не всегда завершается успешно.
При старте актор обращается к шедулеру и говорит «пошли мне сообщение PullCounter через две секунды»:
var timerCancellable: Option[Cancellable] = None
def scheduleTimer() = {
timerCancellable = Some(
context.system.scheduler.scheduleOnce(
period, context.self, PullCounter
)
)
}
override def preStart() = scheduleTimer()
Здесь мы не используем переменную timerCancellable но в общем случае с ее помощью посылку сообщения можно попытаться отменить:
При получении PullCounter актор осуществляет асинхронное хождение в «базу»:
val fReq = Database.fakeRequest()
fReq.map(counter => PullResult(counter)) pipeTo self
fReq.onFailure{ case _ => self ! PullFailed }
При получении ответа, обернутого в PullResult, актор планирует посылку нового сообщения PullCounter (заметьте, кусок про system.shutdown()
нужен просто чтобы когда-нибудь остановить пример):
if(r.counter >= 5) {
context.system.shutdown()
} else {
scheduleTimer()
}
Если же хождение в базу завершается неуспешно, срабатывает onFailure и актор получает сообщение PullFailed:
scheduleTimer()
Это очень важный момент, так как если бы мы его упустили, в случае ошибки актор перестал бы ходить в базу.
Следует отметить, что здесь мы использовали метод sheduleOnce и вручную планировали следующую посылку сообщения. Шедулер также имеет более простой метод shedule, позволяющий выполнять посылку сообщений регулярно без повторного планирования. Но в данном случае использовать его — плохая идея. Дело в том, что хождение в базу может занять больше заданного нами period’а в две секунды. В этом случае сообщения от шедулера начнут копиться в очереди актора, а в базу данных будет одновременно посылаться множество запросов, так как новый запрос будет создаваться до завершения предыдущего. Как результат, база ляжет, а очередь переполнится.
Если же в вашем приложении требуется именно shedule, соответствующий пример вы найдете здесь. Однако примите во внимание, что использование shedule не гарантирует вам регулярного получения заданного сообщения раз в N единиц времени. Во-первых, потому что при рестарте актора (например, из-за эксепшена) посылку сообщений придется запланировать снова, что как бы собьет весь ритм. Во-вторых, дэфолтная реализация шедулера проверяет, не пора ли послать какие-то сообщения, раз в некоторый интервал времени (который можно поменять в конфиге), поэтому особой пунктуальности здесь ожидать не следует.
И кстати, раз уж мы вспомнили про эксепшены, обратите внимание на код:
Он нужен для того, чтобы при перезапуске актора повторно не вызвался preStart. В зависимости от конкретного случая вы должны или не должны этого хотеть. Как следует обдумайте конкретную ситуацию и протестируйте все граничные случаи.
Вывод программы (лишние детали опущены):
[akka://system/user/pullActor] received handled message PullResult(1)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(2)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullFailed
[akka://system/user/pullActor] received unhandled message Failure(...
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(4)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(5)
[akka://system/user] stopping
[akka://system/user/pullActor] stopped
[akka://system/user] stopped
Надеюсь, вы заметили, как ловко мы избавились от необходимости писать логи вручную, воспользовавшись LoggingReceive и правильно настроив Akka?
Как видите, при использовании шедулера приходится учитывать массу деталей. Не расслабляйтесь!
Дополнение: Делаем метрики и мониторинг для Akka при помощи Kamon
Метки: Akka, Scala, Функциональное программирование.
Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.