Основы работы с акторами в Akka

6 ноября 2014

Akka — это библиотека для языков Java и Scala, в которой реализованы агенты, акторы, взаимодействие между акторами по сети и многие другие удобняшки. В первом приближении это напоминает Erlang или Cloud Haskell, но на самом деле Akka куда мощнее. Например, с помощью Akka можно легко объединить несколько машин в кластер, в котором будет отслеживаться исчезновение и появление машин, а акторы будут автоматически перемещаться с машины на машину по мере изменения размеров кластера. И это только один из примеров. В этой заметке мы напишем очень простое приложение, использующее Akka, чтобы показать, насколько легко работать с этой библиотекой.

Далее предполагается, что вы знаете, что такое модель акторов, а также, что вы умеете работать с футурами в Scala.

Откроем build.sbt и подключим Akka к нашему проекту:

name := "akka-examples"

version := "0.1"

scalaVersion := "2.11.4"

libraryDependencies +=
  "com.typesafe.akka" %% "akka-actor" % "2.3.6"

resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"

Напишем простейший актор, который хранит пары ключ-значение:

case class SetRequest(key: String, value: String)
case class GetRequest(key: String)
case class GetResponse(key: Option[String])

class MapActor extends Actor with ActorLogging {
  val state: mutable.Map[String, String] = mutable.Map.empty

  def receive = {
    case r: SetRequest =>
      state += r.key -> r.value
    case r: GetRequest =>
      sender ! GetResponse(state.get(r.key))
    case r =>
      log.warning(s"Unexpected: $r")
  }
}

Как видите, все довольно просто. Получая сообщение типа SetRequest, актор сохраняет в мапе пару ключ-значение. Если же актору приходит GetRequest, он находит в мапе значение по переданному ключу и посылает отправителю запроса GetResponse с найденным значением типа Option[String].

Для запуска акторов нам понадобится создать ActorSystem:

object Example2 extends App {
  val system = ActorSystem("system")
  val mainActor = system.actorOf(Props(new MainActor), "mainActor")
  system.awaitTermination()
}

Здесь мы создаем один-единственный актор MainActor и ждем завершения работы ActorSystem. MainActor определим следующим образом:

case object Start

class MainActor extends Actor with ActorLogging {
  implicit val timeout = Timeout(5 seconds)
  val mapActor = context.actorOf(Props(new MapActor), "mapActor")

  override def preStart() {
    self ! Start
  }

  def receive = {
    case Start =>
      mapActor ! "dummy request"
      mapActor ! SetRequest("key", "value")
      val respF = mapActor ? GetRequest("key")
      respF pipeTo self

    case r: GetResponse =>
      log.warning(s"Response: $r")
      context.system.shutdown()
  }
}

Как видите, здесь создается MapActor, который мы определили ранее. Также при запуске MainActor посылает сам себе сообщение Start (см метод preStart). При получении этого сообщения актор шлет MapActor’у три сообщения. Посылка первых двух сообщений происходит по принципу fire and forget. А вот при посылке запроса GetRequest актор ожидает получить какой-то ответ, поэтому вместо ! (произносится «tell» или «bang») используется оператор ? (произносится «ask»).

Последний возвращает футуру с ответом сервера respF. Таким образом, никакой блокировки не происходит, в отличии, например, от gen_server:call в Erlang. Далее с помощью комбинатора pipeTo мы пересылаем ответ, когда он станет доступен, сами себе. Само собой разумеется, pipeTo работает не только с self, но и с любыми другими акторами. Когда MainActor наконец-то получает ответ (GetResponse), он просто выводит его в лог и затем останавливает ActorSystem.

Кстати, именно для оператора ? нужен неявный (implicit) аргумент timeout.

Код программы целиком:

package me.eax.akka_examples

import akka.actor._
import akka.pattern.{ask, pipe}
import akka.util.Timeout

import scala.collection._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

case class SetRequest(key: String, value: String)
case class GetRequest(key: String)
case class GetResponse(key: Option[String])

class MapActor extends Actor with ActorLogging {
  val state: mutable.Map[String, String] = mutable.Map.empty

  def receive = {
    case r: SetRequest =>
      state += r.key -> r.value
    case r: GetRequest =>
      sender ! GetResponse(state.get(r.key))
    case r =>
      log.warning(s"Unexpected: $r")
  }
}

case object Start

class MainActor extends Actor with ActorLogging {
  implicit val timeout = Timeout(5 seconds)
  val mapActor = context.actorOf(Props(new MapActor), "mapActor")

  override def preStart() {
    self ! Start
  }

  def receive = {
    case Start =>
      mapActor ! "dummy request"
      mapActor ! SetRequest("key", "value")
      val respF = mapActor ? GetRequest("key")
      respF pipeTo self

    case r: GetResponse =>
      log.warning(s"Response: $r")
      context.system.shutdown()
  }
}

object Example2 extends App {
  val system = ActorSystem("system")
  val mainActor = system.actorOf(Props(new MainActor), "mainActor")
  system.awaitTermination()
}

Вывод программы (таймстемпы и информация о диспетчерах опущена):

[akka://system/user/mainActor/mapActor] Unexpected: dummy request
[akka://system/user/mainActor] Response: GetResponse(Some(value))

Как видите, ничего супер сложного. Подробнее об акторах в Akka вы можете прочитать в официальной документации (очень подробной, также есть в формате PDF). А еще вас может заинтересовать бонусная глава из чудной книги «Seven Concurrency Models in Seven Weeks», посвященная Scala и Akka. Данная глава распространяется бесплатно и доступна в формате PDF.

Дополнение: Добавляем интроспекцию в Akka при помощи 100 строк кода

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.