← На главную

Самые основы работы с Akka Cluster

Как ранее уже отмечалось, Akka позволяет не только создавать акторы, которые обмениваются между собой сообщениями, но и строить кластеры, состоящие из нескольких физических машин. При этом акторы, запущенные на разных машинах, все еще могут взаимодействовать друг с другом. Кроме того, Akka из коробки предоставляет ряд полезных при построении распределенных приложений примитивов. Например, возможность подписаться на события, происходящие с кластером, присваивать узлам роли или запустить актор-одиночку. Что, кстати, делает Akka намного интереснее других реализаций модели акторов (Erlang, Cloud Haskell). В этой заметке мы напишем очень простое приложение, использующее akka-cluster, а также ознакомимся с его поведением при различных условиях.

Исходники к заметке вы найдете в этом репозитории.

Файл build.sbt:

name := "akka-cluster-example" version := "0.1" scalaVersion := "2.11.4" val akkaVersion = "2.3.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-cluster" % akkaVersion ) resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"

Ничего особенного, к проекту просто подключаются пакеты akka-actor и akka-cluster.

Файл application.conf:

akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 2551 } } cluster { seed-nodes = [ "akka.tcp://system@127.0.0.1:2551", "akka.tcp://system@127.0.0.1:2552"] } }

Значение akka.actor.provider заменяется, так как теперь мы с вами пишем распределенное приложение. Задается интерфейс и порт, на которых будет висеть actor system. Наконец, задается список seed nodes. Этот список используется при присоединении узла к кластеру. Seed nodes не являются единой точкой отказа или чем-то в этом роде. Просто одна из seed nodes должна быть онлайн в момент запуска узла, чтобы он успешно присоединился к кластеру. Seed nodes можно задавать не только при помощи конфига, но и при помощи аргументов командной строки:

java -Dakka.cluster.seed-nodes.0=akka.tcp://system@127.0.0.1:2551 \ -Dakka.cluster.seed-nodes.1=akka.tcp://system@127.0.0.1:2552 ...

Наконец, код самого приложения:

package me.eax.akka_examples import akka.cluster.ClusterEvent._ import akka.actor._ import akka.event._ import akka.cluster._ class ClusterListener extends Actor with ActorLogging { val cluster = Cluster(context.system) // subscribe to cluster changes, re-subscribe when restart override def preStart() { cluster.subscribe(self, InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop() { cluster.unsubscribe(self) } def receive = LoggingReceive { case MemberUp(member) => log.info(s"[Listener] node is up: $member") case UnreachableMember(member) => log.info(s"[Listener] node is unreachable: $member") case MemberRemoved(member, prevStatus) => log.info(s"[Listener] node is removed: $member") case ev: MemberEvent => log.info(s"[Listener] event: $ev") } } object AkkaClusterExample extends App { val system = ActorSystem("system") system.actorOf(Props[ClusterListener], "clusterListener") system.awaitTermination() }

Здесь мы создаем единственный актор, который подписывается на все события, происходящие с кластером. Эти события, как можно было легко догадаться, будут приходить актору в виде сообщений. Давайте запустим наше приложение и посмотрим, что это будут за сообщения:

[Remoting] Starting remoting [Remoting] Remoting started; listening on addresses: [akka.tcp:// system@127.0.0.1:2551] ... Node [akka.tcp://system@127.0.0.1:2551] is JOINING, roles [] Leader is moving node [akka.tcp://system@127.0.0.1:2551] to [Up] [Listener] node is up: Member(address = akka.tcp://system@ 127.0.0.1:2551, status = Up)

Как видите, было получено единственное сообщение – MemberUp, что не особо удивительно.

Теперь, не останавливая приложение, отредактируем application.conf:

netty.tcp { hostname = "127.0.0.1" port = 0 # 2551 }

Здесь port = 0 означает, что порт будет выбираться случайным образом. Запустим второй экземпляр приложения. В логах первого экземпляра увидим:

Node [akka.tcp://system@127.0.0.1:4982] is JOINING, roles [] Leader is moving node [akka.tcp://system@127.0.0.1:4982] to [Up] [Listener] node is up: Member(address = akka.tcp://system@ 127.0.0.1:4982, status = Up)

В логах второго:

[Remoting] Starting remoting [Remoting] Remoting started; listening on addresses :[akka.tcp:// system@127.0.0.1:4982] ... Cluster Node [akka.tcp://system@127.0.0.1:4982] - Welcome from [akka.tcp://system@127.0.0.1:2551] [Listener] node is up: Member(address = akka.tcp://system@ 127.0.0.1:2551, status = Up) [Listener] node is up: Member(address = akka.tcp://system@ 127.0.0.1:4982, status = Up)

Теперь остановим второй экземпляр. В логах первого увидим:

[Listener] node is unreachable: Member(address = akka.tcp://system@ 127.0.0.1:4982, status = Up)

Теперь попробуем кое-что новое. На сайте akka.io можно скачать так называемый akka distribution, это архив с именем вроде akka_2.11-2.3.8.zip. Помимо прочего в этом архиве можно найти утилиту akka-cluster, позволяющую получать различную информацию о кластере и управлять им при помощи JMX. Чтобы все работало, утилите нужно сделать chmod u+x, а также добавить путь до нее в $PATH. Кроме того, приложение должно быть запущено примерно с такими опциями:

java -Dcom.sun.management.jmxremote.port=9999 \ -Dcom.sun.management.jmxremote.authenticate=false \ -Dcom.sun.management.jmxremote.ssl=false ...

В IntelliJ IDEA все перечисленные опции можно прописать в Run → Edit Configurations… → AkkaClusterExample → Configuration → VM Options. Если теперь вернуть конфиг в изначальное состояние, запустить ноду и сказать:

akka-cluster 127.0.0.1 9999 cluster-status

… то вы увидите что-то вроде:

Querying cluster status { "self-address": "akka.tcp://system@127.0.0.1:2551", "members": [ { "address": "akka.tcp://system@127.0.0.1:2551", "status": "Up" } ], "unreachable": [ ] }

После запуска второй ноды, как мы это делали в прошлый раз (номер порта JMX также придется поменять):

Querying cluster status { "self-address": "akka.tcp://system@127.0.0.1:2551", "members": [ { "address": "akka.tcp://system@127.0.0.1:2551", "status": "Up" }, { "address": "akka.tcp://system@127.0.0.1:49603", "status": "Up" } ], "unreachable": [ ] }

После остановки второй ноды:

Querying cluster status { "self-address": "akka.tcp://system@127.0.0.1:2551", "members": [ { "address": "akka.tcp://system@127.0.0.1:2551", "status": "Up" }, { "address": "akka.tcp://system@127.0.0.1:49603", "status": "Up" } ], "unreachable": [ { "node": "akka.tcp://system@127.0.0.1:49603", "observed-by": [ "akka.tcp://system@127.0.0.1:2551" ] } ] }

Если теперь заглянуть в логи первой ноды, то вы увидите, как туда сыпятся ошибки вроде следующих:

Association with remote system [akka.tcp://system@127.0.0.1:49603] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://system@127.0.0.1:49603]].

По умолчанию Akka сама не помечает ноды, как упавшие (down). Узлы всего-навсего помечаются, как недоступные (unreachable). Если теперь снова поднять второй узел, в его логах вы увидите:

Association with remote system [akka.tcp://system@127.0.0.1:49603] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://system@127.0.0.1:49603]].

Вы не сможете добавить новые узлы в кластер, до тех пор, пока в кластере есть недоступные узлы. Казалось бы, чтобы решить проблему, просто запустим второй узел с тем же номером порта, что был использован в первый раз, а именно 49603. Как вы помните, номер порта можно задать явно, отредактировав application.conf. Однако это не сработает. В логах первой ноды будет сказано, почему:

Existing member [UniqueAddress(akka.tcp://system@127.0.0.1:49603, 414446918)] is trying to join, ignoring

Что мы на самом деле должны сделать – это вручную убрать ноду из кластера:

akka-cluster 127.0.0.1 9999 down akka.tcp://system@127.0.0.1:49603 akka-cluster 127.0.0.1 9999 cluster-status

Что увидим:

Querying cluster status { "self-address": "akka.tcp://system@127.0.0.1:2551", "members": [ { "address": "akka.tcp://system@127.0.0.1:2551", "status": "Up" }, { "address": "akka.tcp://system@127.0.0.1:47064", "status": "Up" } ], "unreachable": [ { "node": "akka.tcp://system@127.0.0.1:47064", "observed-by": [ "akka.tcp://system@127.0.0.1:2551" ] } ] }

У меня узел с портом 47064 оказался в списке unreachable, так как я уже остановил узел. Соответственно, его также следует вручную убрать из кластера. Если же вы не останавливали узел, у вас он, скорее всего, вполне успешно присоединится к кластеру.

Как вы догадываетесь, вручную убирать ноды из кластера не всегда удобно. Исправить ситуацию можно, добавив в application.conf после списка seed nodes опцию:

auto-down-unreachable-after = 10s

Если узел будет недоступен в течение заданного количества секунд, он автоматически будет удален из кластера. Можете повторить все проделанные нами ранее эксперименты с включенной опцией в качестве домашнего задания. Следует однако отметить, что бездумно включать эту опцию не следует, так как в ряде случаев она может привести к нежелательному поведению кластера, например, при нетсплитах. К сожалению, флага «сделать мне офигительно» в Akka не предусмотрено. Вам предлагается набор примитивов с некой семантикой, и только. Как правильно использовать эти примитивы, зависит от конкретного приложения.

Подробности о работе Akka Cluster вы найдете на страницах Cluster Specification и Cluster Usage официальной документации по Akka. Эта документация классная, не поленитесь с ней ознакомиться :)

А используете ли вы Akka Cluster и если да, то какие задачи с его помощью решаете?

Дополнение: Пример использования акторов-одиночек в Akka