Самые основы работы с Akka Cluster

4 февраля 2015

Как ранее уже отмечалось, Akka позволяет не только создавать акторы, которые обмениваются между собой сообщениями, но и строить кластеры, состоящие из нескольких физических машин. При этом акторы, запущенные на разных машинах, все еще могут взаимодействовать друг с другом. Кроме того, Akka из коробки предоставляет ряд полезных при построении распределенных приложений примитивов. Например, возможность подписаться на события, происходящие с кластером, присваивать узлам роли или запустить актор-одиночку. Что, кстати, делает Akka намного интереснее других реализаций модели акторов (Erlang, Cloud Haskell). В этой заметке мы напишем очень простое приложение, использующее akka-cluster, а также ознакомимся с его поведением при различных условиях.

Исходники к заметке вы найдете в этом репозитории.

Файл build.sbt:

name := "akka-cluster-example"

version := "0.1"

scalaVersion := "2.11.4"

val akkaVersion = "2.3.8"

libraryDependencies ++= Seq(
    "com.typesafe.akka" %% "akka-actor" % akkaVersion,
    "com.typesafe.akka" %% "akka-cluster" % akkaVersion
  )

resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"

Ничего особенного, к проекту просто подключаются пакеты akka-actor и akka-cluster.

Файл application.conf:

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://system@127.0.0.1:2551",
      "akka.tcp://system@127.0.0.1:2552"]
  }
}

Значение akka.actor.provider заменяется, так как теперь мы с вами пишем распределенное приложение. Задается интерфейс и порт, на которых будет висеть actor system. Наконец, задается список seed nodes. Этот список используется при присоединении узла к кластеру. Seed nodes не являются единой точкой отказа или чем-то в этом роде. Просто одна из seed nodes должна быть онлайн в момент запуска узла, чтобы он успешно присоединился к кластеру. Seed nodes можно задавать не только при помощи конфига, но и при помощи аргументов командной строки:

java -Dakka.cluster.seed-nodes.0=akka.tcp://system@127.0.0.1:2551 \
  -Dakka.cluster.seed-nodes.1=akka.tcp://system@127.0.0.1:2552 ...

Наконец, код самого приложения:

package me.eax.akka_examples

import akka.cluster.ClusterEvent._
import akka.actor._
import akka.event._
import akka.cluster._

class ClusterListener extends Actor with ActorLogging {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, re-subscribe when restart
  override def preStart() {
    cluster.subscribe(self, InitialStateAsEvents, classOf[MemberEvent],
                      classOf[UnreachableMember])
  }

  override def postStop() {
    cluster.unsubscribe(self)
  }

  def receive = LoggingReceive {
    case MemberUp(member) =>
      log.info(s"[Listener] node is up: $member")

    case UnreachableMember(member) =>
      log.info(s"[Listener] node is unreachable: $member")

    case MemberRemoved(member, prevStatus) =>
      log.info(s"[Listener] node is removed: $member")

    case ev: MemberEvent =>
      log.info(s"[Listener] event: $ev")
  }
}

object AkkaClusterExample extends App {
  val system = ActorSystem("system")
  system.actorOf(Props[ClusterListener], "clusterListener")
  system.awaitTermination()
}

Здесь мы создаем единственный актор, который подписывается на все события, происходящие с кластером. Эти события, как можно было легко догадаться, будут приходить актору в виде сообщений. Давайте запустим наше приложение и посмотрим, что это будут за сообщения:

[Remoting] Starting remoting
[Remoting] Remoting started; listening on addresses: [akka.tcp://
           system@127.0.0.1:2551]
...
Node [akka.tcp://system@127.0.0.1:2551] is JOINING, roles []
Leader is moving node [akka.tcp://system@127.0.0.1:2551] to [Up]
[Listener] node is up: Member(address = akka.tcp://system@
           127.0.0.1:2551, status = Up)

Как видите, было получено единственное сообщение — MemberUp, что не особо удивительно.

Теперь, не останавливая приложение, отредактируем application.conf:

    netty.tcp {
      hostname = "127.0.0.1"
      port = 0 # 2551
    }

Здесь port = 0 означает, что порт будет выбираться случайным образом. Запустим второй экземпляр приложения. В логах первого экземпляра увидим:

Node [akka.tcp://system@127.0.0.1:4982] is JOINING, roles []
Leader is moving node [akka.tcp://system@127.0.0.1:4982] to [Up]
[Listener] node is up: Member(address = akka.tcp://system@
           127.0.0.1:4982, status = Up)

В логах второго:

[Remoting] Starting remoting
[Remoting] Remoting started; listening on addresses :[akka.tcp://
           system@127.0.0.1:4982]
...
Cluster Node [akka.tcp://system@127.0.0.1:4982] - Welcome from
     [akka.tcp://system@127.0.0.1:2551]
[Listener] node is up: Member(address = akka.tcp://system@
           127.0.0.1:2551, status = Up)
[Listener] node is up: Member(address = akka.tcp://system@
           127.0.0.1:4982, status = Up)

Теперь остановим второй экземпляр. В логах первого увидим:

[Listener] node is unreachable: Member(address = akka.tcp://system@
           127.0.0.1:4982, status = Up)

Теперь попробуем кое-что новое. На сайте akka.io можно скачать так называемый akka distribution, это архив с именем вроде akka_2.11-2.3.8.zip. Помимо прочего в этом архиве можно найти утилиту akka-cluster, позволяющую получать различную информацию о кластере и управлять им при помощи JMX. Чтобы все работало, утилите нужно сделать chmod u+x, а также добавить путь до нее в $PATH. Кроме того, приложение должно быть запущено примерно с такими опциями:

java -Dcom.sun.management.jmxremote.port=9999 \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false ...

В IntelliJ IDEA все перечисленные опции можно прописать в Run → Edit Configurations… → AkkaClusterExample → Configuration → VM Options. Если теперь вернуть конфиг в изначальное состояние, запустить ноду и сказать:

akka-cluster 127.0.0.1 9999 cluster-status

… то вы увидите что-то вроде:

Querying cluster status
{
  "self-address": "akka.tcp://system@127.0.0.1:2551",
  "members": [
    {
      "address": "akka.tcp://system@127.0.0.1:2551",
      "status": "Up"
    }
  ],
  "unreachable": [
   
  ]
}

После запуска второй ноды, как мы это делали в прошлый раз (номер порта JMX также придется поменять):

Querying cluster status
{
  "self-address": "akka.tcp://system@127.0.0.1:2551",
  "members": [
    {
      "address": "akka.tcp://system@127.0.0.1:2551",
      "status": "Up"
    },
    {
      "address": "akka.tcp://system@127.0.0.1:49603",
      "status": "Up"
    }
  ],
  "unreachable": [
   
  ]
}

После остановки второй ноды:

Querying cluster status
{
  "self-address": "akka.tcp://system@127.0.0.1:2551",
  "members": [
    {
      "address": "akka.tcp://system@127.0.0.1:2551",
      "status": "Up"
    },
    {
      "address": "akka.tcp://system@127.0.0.1:49603",
      "status": "Up"
    }
  ],
  "unreachable": [
    {
      "node": "akka.tcp://system@127.0.0.1:49603",
      "observed-by": [
        "akka.tcp://system@127.0.0.1:2551"
      ]
    }
  ]
}

Если теперь заглянуть в логи первой ноды, то вы увидите, как туда сыпятся ошибки вроде следующих:

Association with remote system [akka.tcp://system@127.0.0.1:49603] has
  failed, address is now gated for [5000] ms. Reason is: [Association
  failed with [akka.tcp://system@127.0.0.1:49603]].

По умолчанию Akka сама не помечает ноды, как упавшие (down). Узлы всего-навсего помечаются, как недоступные (unreachable). Если теперь снова поднять второй узел, в его логах вы увидите:

Association with remote system [akka.tcp://system@127.0.0.1:49603] has
  failed, address is now gated for [5000] ms. Reason is: [Association
  failed with [akka.tcp://system@127.0.0.1:49603]].

Вы не сможете добавить новые узлы в кластер, до тех пор, пока в кластере есть недоступные узлы. Казалось бы, чтобы решить проблему, просто запустим второй узел с тем же номером порта, что был использован в первый раз, а именно 49603. Как вы помните, номер порта можно задать явно, отредактировав application.conf. Однако это не сработает. В логах первой ноды будет сказано, почему:

Existing member [UniqueAddress(akka.tcp://system@127.0.0.1:49603,
  414446918)] is trying to join, ignoring

Что мы на самом деле должны сделать — это вручную убрать ноду из кластера:

akka-cluster 127.0.0.1 9999 down akka.tcp://system@127.0.0.1:49603
akka-cluster 127.0.0.1 9999 cluster-status

Что увидим:

Querying cluster status
{
  "self-address": "akka.tcp://system@127.0.0.1:2551",
  "members": [
    {
      "address": "akka.tcp://system@127.0.0.1:2551",
      "status": "Up"
    },
    {
      "address": "akka.tcp://system@127.0.0.1:47064",
      "status": "Up"
    }
  ],
  "unreachable": [
    {
      "node": "akka.tcp://system@127.0.0.1:47064",
      "observed-by": [
        "akka.tcp://system@127.0.0.1:2551"
      ]
    }
  ]
}

У меня узел с портом 47064 оказался в списке unreachable, так как я уже остановил узел. Соответственно, его также следует вручную убрать из кластера. Если же вы не останавливали узел, у вас он, скорее всего, вполне успешно присоединится к кластеру.

Как вы догадываетесь, вручную убирать ноды из кластера не всегда удобно. Исправить ситуацию можно, добавив в application.conf после списка seed nodes опцию:

auto-down-unreachable-after = 10s

Если узел будет недоступен в течение заданного количества секунд, он автоматически будет удален из кластера. Можете повторить все проделанные нами ранее эксперименты с включенной опцией в качестве домашнего задания. Следует однако отметить, что бездумно включать эту опцию не следует, так как в ряде случаев она может привести к нежелательному поведению кластера, например, при нетсплитах. К сожалению, флага «сделать мне офигительно» в Akka не предусмотрено. Вам предлагается набор примитивов с некой семантикой, и только. Как правильно использовать эти примитивы, зависит от конкретного приложения.

Подробности о работе Akka Cluster вы найдете на страницах Cluster Specification и Cluster Usage официальной документации по Akka. Эта документация классная, не поленитесь с ней ознакомиться :)

А используете ли вы Akka Cluster и если да, то какие задачи с его помощью решаете?

Дополнение: Пример использования акторов-одиночек в Akka

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.