Давайте напишем акторы на языке Go

13 июня 2019

Нет такой вещи, как идеальная модель многопоточности. Одни задачи хорошо ложатся на треды и мьютексы, другие на каналы и горутины (a.k.a CSP), третьи на акторную модель, четвертые на Software Transaction Memory. Из этих моделей язык Go предлагает первые две. Но, как мы сейчас убедимся на примере акторов, не представляет труда добавить в язык и другие модели.

Когда использовать акторы?

Актор представляет собой некоторое состояние и горутину, отвечающую за чтение и модификацию этого состояния. Взаимодействие между акторами осуществляется при помощи обмена сообщениями. Таким образом, в идеальной модели акторов отсутствует разделяемое состояние. На практике, конечно, ничто не мешает немного отклоняться от этого правила, если очень нужно.

Звучит как что-то очень похожее на CSP. Спрашивается, что же такого могут предложить акторы, чего не может CSP. Вопрос этот аналогичен вопросу «Любую программу можно написать на Си, тогда зачем нужны какие-то другие языки?».

Действительно, можно. Вопрос в том, что будет удобнее применить в данной конкретной ситуации. Если вы видите, что решаемая вами проблема хорошо ложится на CSP — супер, используйте CSP. Но иногда вы можете обнаружить, что состояние ваших горутин представляет собой конечный автомат, и что состояние меняется в зависимости от приходящих сообщений. При этом в разном состоянии требуется по-разному реагировать на одни и те же сообщения, а обработку некоторых вообще требуется отложить до смены состояния. Плюс к этому некоторые сообщения требуется обрабатывать вне очереди, а некоторые передавать между разными физическими машинами. При этом длины очередей неплохо бы мониторить, предусмотреть какой-то механизм back pressure на случай переполнения очередей, и так далее. Тогда фреймворк, реализующий акторную модель, тупо избавит вас от написания лишнего кода, поскольку даст вам все это в готовом виде.

Менее очевидное преимущество акторной модели состоит в следующем. Для поддержки действительно сложных систем требуется подробная документация того, как система работает. При этом разработчики в среднем по больнице не очень склонны писать такую документацию. А если они ее и пишут, то не совсем понятно, как следить за тем, чтобы документация соответствовала текущему состоянию кода. При использовании акторной модели система автоматически разбивается на простые и понятные сущности — акторы, состояния акторов с графом переходов между ними, а также сообщения. По каждой сущности пишется подробный комментарий в стиле «этот актор отвечает за … принимает сообщения … и посылает сообщения …», «данное сообщение посылается актору … когда … и состоит из полей …», и так далее. А за актуальностью комментариев разработчики обычно следят. Кстати, все это отлично описывается в PlantUML.

Что важно, вы явно контролируете и описываете все возможные состояния. За счет этого существенно снижается вероятность возникновения ситуации вроде «ой, мы не подумали, что находясь в таком-то состоянии можем получить такое-то сообщение». По моему ничего не доказывающему опыту, такое постоянно случается при использовании прочих моделей, особенно тредово-мьютексной.

Есть у акторной модели и минусы. Во-первых, она имеет некоторый оверхед по сравнению с тредами и мьютексами. Но если вы уже пишите на Go, скорее всего, это не является для вас существенной проблемой. Во-вторых, обмен сообщениями между акторами является нетипизированным. Соответственно, возрастает вероятность получения ошибок в рантайме, а не при компиляции. Данная проблема решается написанием типизированных интерфейсов-клиентов для конкретных акторов. И в-третьих, нужно следить за длинами очередей сообщений и предусматривать какие-то механизмы back pressure. К счастью, данные механизмы легко реализуются.

Имплементация акторов на Go

Существуют готовые акторные фреймворки на Go, например, Proto.Actor. Но беглый анализ данного фреймворка показал, что он довольно развесистый, и не факт, что решает именно те задачи, которые мне было нужно. Альтернативные же реализации выглядели просто игрушечными. Поэтому было решено попробовать написать собственный небольшой фреймворк. Как оказалось, занимает это пару вечеров, а объем кода составляет ~300 строк кода, если не считать тестов.

Основными типами являются следующие (см также страницу на GoDoc):

// Message represents a message sent between actors.
type Message interface {}

// Pid is an unique ID of an Actor.
type Pid uint64

// Actor is an entity that processes messages, sends messages to
// other actors and stores some state.
type Actor interface {
  // Receive is called when Actor receives a new Message.
  // Returns a new Actor state and/or error.
  Receive(message Message) (Actor, error)
}

// UnexpectedMessage is returned by Actor.Receive when the actor didn't
// expect a given message.
var UnexpectedMessage = fmt.Errorf("Unexpected message")

// Stash is returned by Actor.Receive when actor wants to delay the
// processing of the message until the next actor state change.
var Stash = fmt.Errorf("Stash")

// Terminate is returned by Actor.Receive when actor wants to terminate.
var Terminate = fmt.Errorf("Terminate")

// Constructor is a procedure that creates a new Actor.
// It is called when Actor is created, before receiving any Messages.
// `state` sets the initial actor state. If `limit` is > 0 it sets the
// maximum size of the mailbox (only for regular, not stashed, not
// priority messages).
type Constructor func(system System, pid Pid) (state Actor, limit int)

// System is a class responsible for creating, scheduling and otherwise
// controlling actors.
type System interface {
  // Spawn creates a new Actor and returns it's Pid.
  Spawn(constructor Constructor) Pid

  // Send sends a Message to the Actor with a given Pid. InvalidPid is
  // returned if actor with a given Pid doesn't exist or was
  // terminated.
  Send(pid Pid, message Message) error

  // SendPriority sends a priority Message to the Actor with a given Pid.
  // Priority messages are processed before any other messages.
  // InvalidPid is returned if actor with a given Pid doesn't exist
  // or was terminated.
  SendPriority(pid Pid, message Message) error

  // AwaitTermination returns when all spawned Actors terminate.
  AwaitTermination()
}

// InvalidPid is returned by System.Send when actor with a given Pid
// doesn't exist (e.g. it was terminated).
var InvalidPid = fmt.Errorf("Invalid actor Pid")

// MailboxFull is returned by System.Send if actor's Mailbox is full.
var MailboxFull = fmt.Errorf("Mailbox is full")

Здесь я старался следовать принципам литературного программирования, чтобы код был понятен при прочтении сверху вниз. Надеюсь, что получилось.

Рассмотрим один из тестов, демонстрирующий типичное использование фреймворка:

type PingPongActor struct {
  system actor.System
  pid actor.Pid
  pong_received chan struct{}
}

type PingMessage struct {
  from actor.Pid
}

type PongMessage struct {
  from actor.Pid
}

type SendPingMessage struct {
  to actor.Pid
  pong_received chan struct{}
}

type TerminateMessage struct {}

func (a *PingPongActor) Receive(message actor.Message)
  (actor.Actor, error) {
  switch v := message.(type) {
  case SendPingMessage:
    a.pong_received = v.pong_received
    _ = a.system.Send(v.to, PingMessage{from: a.pid})
    return a, nil
  case PingMessage:
    _ = a.system.Send(v.from, PongMessage{from: a.pid})
    return a, nil
  case PongMessage:
    a.pong_received <- struct{}{}
    return a, nil
  case TerminateMessage:
    return a, errors.Terminate
  default:
    return a, fmt.Errorf("Don't know what to do with %T", v)
  }
}

func newPingPongActor(system actor.System, pid actor.Pid)
  (state actor.Actor, limit int) {
  state = &PingPongActor{
    system: system,
    pid: pid,
  }
  return
}

func TestPingPong(t *testing.T) {
  system := New()
  pong_received := make(chan struct{}, 1)
  pid1 := system.Spawn(newPingPongActor)
  pid2 := system.Spawn(newPingPongActor)
  err := system.Send(pid1, SendPingMessage{
    to: pid2,
    pong_received: pong_received
  })
  require.NoError(t, err)
  <-pong_received
  err = system.Send(pid1, TerminateMessage{})
  require.NoError(t, err)
  err = system.Send(pid2, TerminateMessage{})
  require.NoError(t, err)
  system.AwaitTermination()
}

Здесь создается два актора PingPongActor. Первому посылается сообщение SendPingMessage, принимая которое он шлет PingMessage второму актору. Получив это сообщение, второй актор отвечает первому сообщением PongMessage. Получив это сообщение, первый актор записывает пустую структуру в канал pong_received, давая тесту понять, что все отработало в соответствии с нашими ожиданиями.

В данном примере метод Receive всегда возвращает структуру PingPongActor. Но в общем случае ничто не мешает возвращать какую-то другую структуру, реализующую интерфейс Actor. Таким образом может осуществляться переход актора между различными состояниями (аналог FSM в Erlang). Если актор хочет отложить обработку сообщения, он может вернуть errors.Stash. Отложенные сообщения автоматически помещаются в начало очереди при смене состояния актора (возвращении другой реализации Actor). Таким образом реализуется аналог stash/unstash из Akka. Пример использования этих возможностей вы найдете в тесте TestSystemStashUnstash.

В тесте TestSystemSendPriority вы найдете пример приоритетной посылки сообщений. Тест TestSystemMailboxFull демонстрирует работу механизма back pressure. В деталях рассматривать здесь всю реализацию фреймворка, как мне кажется, было бы скучновато. Заинтересованные читатели могут ознакомиться с содержимым соответствующего репозитория на GitHub.

Заключение

Во фреймворке не были реализованы супервизоры и мониторы. Поскольку в Go нет исключений и вся обработка ошибок осуществляется явно, я не уверен, что в данных механизмах есть необходимость. Также не был реализован какой-либо сетевой транспорт. Потребность в нем есть не во всех приложениях, и реализация такого транспорта — достаточно большая задача. Возможно, он появится в будущих версиях библиотеки и, соответственно, будет описан в отдельном посте.

А что вы думаете о модели акторов и о том, нужна ли она в языке Go?

Метки: , .

Понравился пост? Узнайте, как можно поддержать развитие этого блога.

Также подпишитесь на RSS, Facebook, ВКонтакте, Twitter или Telegram.