Самые основы работы с Akka Cluster
4 февраля 2015
Как ранее уже отмечалось, Akka позволяет не только создавать акторы, которые обмениваются между собой сообщениями, но и строить кластеры, состоящие из нескольких физических машин. При этом акторы, запущенные на разных машинах, все еще могут взаимодействовать друг с другом. Кроме того, Akka из коробки предоставляет ряд полезных при построении распределенных приложений примитивов. Например, возможность подписаться на события, происходящие с кластером, присваивать узлам роли или запустить актор-одиночку. Что, кстати, делает Akka намного интереснее других реализаций модели акторов (Erlang, Cloud Haskell). В этой заметке мы напишем очень простое приложение, использующее akka-cluster, а также ознакомимся с его поведением при различных условиях.
Исходники к заметке вы найдете в этом репозитории.
Файл build.sbt:
version := "0.1"
scalaVersion := "2.11.4"
val akkaVersion = "2.3.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster" % akkaVersion
)
resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"
Ничего особенного, к проекту просто подключаются пакеты akka-actor и akka-cluster.
Файл application.conf:
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://system@127.0.0.1:2551",
"akka.tcp://system@127.0.0.1:2552"]
}
}
Значение akka.actor.provider заменяется, так как теперь мы с вами пишем распределенное приложение. Задается интерфейс и порт, на которых будет висеть actor system. Наконец, задается список seed nodes. Этот список используется при присоединении узла к кластеру. Seed nodes не являются единой точкой отказа или чем-то в этом роде. Просто одна из seed nodes должна быть онлайн в момент запуска узла, чтобы он успешно присоединился к кластеру. Seed nodes можно задавать не только при помощи конфига, но и при помощи аргументов командной строки:
-Dakka.cluster.seed-nodes.1=akka.tcp://system@127.0.0.1:2552 ...
Наконец, код самого приложения:
import akka.cluster.ClusterEvent._
import akka.actor._
import akka.event._
import akka.cluster._
class ClusterListener extends Actor with ActorLogging {
val cluster = Cluster(context.system)
// subscribe to cluster changes, re-subscribe when restart
override def preStart() {
cluster.subscribe(self, InitialStateAsEvents, classOf[MemberEvent],
classOf[UnreachableMember])
}
override def postStop() {
cluster.unsubscribe(self)
}
def receive = LoggingReceive {
case MemberUp(member) =>
log.info(s"[Listener] node is up: $member")
case UnreachableMember(member) =>
log.info(s"[Listener] node is unreachable: $member")
case MemberRemoved(member, prevStatus) =>
log.info(s"[Listener] node is removed: $member")
case ev: MemberEvent =>
log.info(s"[Listener] event: $ev")
}
}
object AkkaClusterExample extends App {
val system = ActorSystem("system")
system.actorOf(Props[ClusterListener], "clusterListener")
system.awaitTermination()
}
Здесь мы создаем единственный актор, который подписывается на все события, происходящие с кластером. Эти события, как можно было легко догадаться, будут приходить актору в виде сообщений. Давайте запустим наше приложение и посмотрим, что это будут за сообщения:
[Remoting] Remoting started; listening on addresses: [akka.tcp://
system@127.0.0.1:2551]
...
Node [akka.tcp://system@127.0.0.1:2551] is JOINING, roles []
Leader is moving node [akka.tcp://system@127.0.0.1:2551] to [Up]
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:2551, status = Up)
Как видите, было получено единственное сообщение — MemberUp, что не особо удивительно.
Теперь, не останавливая приложение, отредактируем application.conf:
hostname = "127.0.0.1"
port = 0 # 2551
}
Здесь port = 0
означает, что порт будет выбираться случайным образом. Запустим второй экземпляр приложения. В логах первого экземпляра увидим:
Leader is moving node [akka.tcp://system@127.0.0.1:4982] to [Up]
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:4982, status = Up)
В логах второго:
[Remoting] Remoting started; listening on addresses :[akka.tcp://
system@127.0.0.1:4982]
...
Cluster Node [akka.tcp://system@127.0.0.1:4982] - Welcome from
[akka.tcp://system@127.0.0.1:2551]
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:2551, status = Up)
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:4982, status = Up)
Теперь остановим второй экземпляр. В логах первого увидим:
127.0.0.1:4982, status = Up)
Теперь попробуем кое-что новое. На сайте akka.io можно скачать так называемый akka distribution, это архив с именем вроде akka_2.11-2.3.8.zip. Помимо прочего в этом архиве можно найти утилиту akka-cluster, позволяющую получать различную информацию о кластере и управлять им при помощи JMX. Чтобы все работало, утилите нужно сделать chmod u+x
, а также добавить путь до нее в $PATH. Кроме того, приложение должно быть запущено примерно с такими опциями:
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false ...
В IntelliJ IDEA все перечисленные опции можно прописать в Run → Edit Configurations… → AkkaClusterExample → Configuration → VM Options. Если теперь вернуть конфиг в изначальное состояние, запустить ноду и сказать:
… то вы увидите что-то вроде:
{
"self-address": "akka.tcp://system@127.0.0.1:2551",
"members": [
{
"address": "akka.tcp://system@127.0.0.1:2551",
"status": "Up"
}
],
"unreachable": [
]
}
После запуска второй ноды, как мы это делали в прошлый раз (номер порта JMX также придется поменять):
{
"self-address": "akka.tcp://system@127.0.0.1:2551",
"members": [
{
"address": "akka.tcp://system@127.0.0.1:2551",
"status": "Up"
},
{
"address": "akka.tcp://system@127.0.0.1:49603",
"status": "Up"
}
],
"unreachable": [
]
}
После остановки второй ноды:
{
"self-address": "akka.tcp://system@127.0.0.1:2551",
"members": [
{
"address": "akka.tcp://system@127.0.0.1:2551",
"status": "Up"
},
{
"address": "akka.tcp://system@127.0.0.1:49603",
"status": "Up"
}
],
"unreachable": [
{
"node": "akka.tcp://system@127.0.0.1:49603",
"observed-by": [
"akka.tcp://system@127.0.0.1:2551"
]
}
]
}
Если теперь заглянуть в логи первой ноды, то вы увидите, как туда сыпятся ошибки вроде следующих:
failed, address is now gated for [5000] ms. Reason is: [Association
failed with [akka.tcp://system@127.0.0.1:49603]].
По умолчанию Akka сама не помечает ноды, как упавшие (down). Узлы всего-навсего помечаются, как недоступные (unreachable). Если теперь снова поднять второй узел, в его логах вы увидите:
failed, address is now gated for [5000] ms. Reason is: [Association
failed with [akka.tcp://system@127.0.0.1:49603]].
Вы не сможете добавить новые узлы в кластер, до тех пор, пока в кластере есть недоступные узлы. Казалось бы, чтобы решить проблему, просто запустим второй узел с тем же номером порта, что был использован в первый раз, а именно 49603. Как вы помните, номер порта можно задать явно, отредактировав application.conf. Однако это не сработает. В логах первой ноды будет сказано, почему:
414446918)] is trying to join, ignoring
Что мы на самом деле должны сделать — это вручную убрать ноду из кластера:
akka-cluster 127.0.0.1 9999 cluster-status
Что увидим:
{
"self-address": "akka.tcp://system@127.0.0.1:2551",
"members": [
{
"address": "akka.tcp://system@127.0.0.1:2551",
"status": "Up"
},
{
"address": "akka.tcp://system@127.0.0.1:47064",
"status": "Up"
}
],
"unreachable": [
{
"node": "akka.tcp://system@127.0.0.1:47064",
"observed-by": [
"akka.tcp://system@127.0.0.1:2551"
]
}
]
}
У меня узел с портом 47064 оказался в списке unreachable, так как я уже остановил узел. Соответственно, его также следует вручную убрать из кластера. Если же вы не останавливали узел, у вас он, скорее всего, вполне успешно присоединится к кластеру.
Как вы догадываетесь, вручную убирать ноды из кластера не всегда удобно. Исправить ситуацию можно, добавив в application.conf после списка seed nodes опцию:
Если узел будет недоступен в течение заданного количества секунд, он автоматически будет удален из кластера. Можете повторить все проделанные нами ранее эксперименты с включенной опцией в качестве домашнего задания. Следует однако отметить, что бездумно включать эту опцию не следует, так как в ряде случаев она может привести к нежелательному поведению кластера, например, при нетсплитах. К сожалению, флага «сделать мне офигительно» в Akka не предусмотрено. Вам предлагается набор примитивов с некой семантикой, и только. Как правильно использовать эти примитивы, зависит от конкретного приложения.
Подробности о работе Akka Cluster вы найдете на страницах Cluster Specification и Cluster Usage официальной документации по Akka. Эта документация классная, не поленитесь с ней ознакомиться :)
А используете ли вы Akka Cluster и если да, то какие задачи с его помощью решаете?
Дополнение: Пример использования акторов-одиночек в Akka
Метки: Akka, Scala, Функциональное программирование.
Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.