Пример обмена сообщениями через RabbitMQ на Scala

7 сентября 2015

Сегодня мы рассмотрим простой пример организации pubsub поверх RabbitMQ. Как уже многократно отмечалось в предыдущих заметках, если вашим бэкендам нужно между собой как-то общаться, желательно взять готовую шину, а не писать по сути свою с нуля на каком-нибудь Akka Cluster. Если только, конечно, вы не пишите свой собственный RabbitMQ или там Spark.

По сути, вот что мы хотим получить:

case class PubSubMessage(topic: String, message: String)

trait PubSubClient {
  def publish(topic: String, msg: String): Future[Unit]
  def subscribe(topic: String, ref: ActorRef): Future[Unit]
  def unsubscribe(topic: String, ref: ActorRef): Future[Unit]
}

Есть топики и есть какие-то сообщения, которые мы публикуем в этих топиках. Акторы могут подписываться на топики и отписываться от них. При публикации сообщения в топике все подписанные на него акторы (в том числе акторы на других экземплярах бэкенда) получают PubSubMessage с этим сообщением.

Несмотря на простоту интерфейса, практика показывает, что с его помощью можно реализовать абсолютно все, что вы могли хотеть реализовать с использованием Akka Cluster. Гарантии на доставку сообщений будут ничем не хуже. Кроме того, приложение будет намного более предсказуемо вести себя при сетевых проблемах.

Переходим к реализации. Подцепляем к проекту клиент для RabbitMQ:

"com.rabbitmq" % "amqp-client" % "3.5.3"

Поскольку этот пакет предоставляет блокирующий API, нам понадобится создать отдельный dispatcher, прописав в application.conf:

pubsub-actor-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 1
    parallelism-factor = 1.0
    parallelism-max = 1
  }
}

Соответственно, актор, реализующий весь pubsub, будет запускаться так:

val props = {
  Props(new BroadcastPubSubClientActor(/* ... args ... */))
    .withDispatcher("pubsub-actor-dispatcher")
}

val pubSubActorRef = system.actorOf(props, "pubSubClientActor")

Основные моменты в реализации BroadcastPubSubClientActor следующие.

  private var chan: Channel = _
  private var consumer: QueueingConsumer = _
  private var pubsubQueueName: String = _
  private var dummyQueueName: String = _

  override def preStart(): Unit = {
    log.debug("preStart() called")
    context.self ! Connect
  }

  override def postRestart(e: Throwable): Unit = {
    log.warning(s"Reconnecting in $reconnectTimeoutMs ms")
    context.system.scheduler.scheduleOnce(
      reconnectTimeoutMs.millis, context.self, Connect
    )
  }

  override def receive: Receive = {
    case Connect =>
      log.debug("Connecting...")

      val connFactory = new ConnectionFactory()
      connFactory.setHost(host)
      connFactory.setPort(port)
      connFactory.setUsername(login)
      connFactory.setPassword(password)
      connFactory.setVirtualHost(vhost)

      val conn = connFactory.newConnection()

      chan = conn.createChannel()
      consumer = new QueueingConsumer(chan)
      dummyQueueName = chan.queueDeclare().getQueue
      pubsubQueueName = chan.queueDeclare().getQueue
      chan.basicConsume(pubsubQueueName, true, consumer)

      scheduleCheckDelivery()
      context.become(connected)
  }

Тут все довольно просто. Устанавливается соединение с RabbitMQ. Создается новый канал. Внутри одного соединения к RabbitMQ может быть много каналов, каждый из которых имеет свои подписки. Далее создается новая очередь, которую актор будет цеплять к интересующим его exchange’ам и из которой будет получать интересующие его сообщения. Также создается dummy-очередь, которая понадобится чуть ниже. Затем актор переходит в состояние connected. Можете на досуге проверить, что в случае разрыва соединения с RabbitMQ актор действительно будет пытаться время от времени его восстановить. В том числе, после перезапуска актор действительно возвращается назад к состоянию receive.

  def connected: Receive = {
    case r: Publish => // PubSubClient.publish
      val exchange = exchangeName(r.topic)
      val fullMsgJson = JArray(List(JString(r.topic), JString(r.msg)))
      val fullMsgStr = compact(fullMsgJson)

      declareExchange(exchange)

      chan.basicPublish(exchange, r.topic, null,
        fullMsgStr.getBytes("UTF-8"))
      sender() ! {}

    case r: Subscribe => // PubSubClient.subscribe
      context.watch(r.ref)
      updateSubscribers(r.topic, s => s + r.ref)
      sender() ! {}

    case r: Unsubsribe => // PubSubClient.unsubscribe
      context.unwatch(r.ref)
      updateSubscribers(r.topic, s => s - r.ref)
      sender() ! {}

    case r: Terminated =>
      for(topic <- subscriptions.keys) {
        updateSubscribers(topic, s => s - r.actor)
      }

Здесь происходит обработка основных сообщений. При получении Publish определяется имя соответствующего exchange’а и в него происходит публикация сообщения. Притом сообщение оборачивается в JSON и содержит внутри себя имя топика. Трудно сказать заранее, какая реализация будет лучше работать — с одним exchange’ом на все топики, по одному exchange’у на топик, или какой-то вариант посередине. Поэтому в данной реализации программист заранее задает желаемое количество exchange’ей, а отображение топика в exchange происходит по хэшу. Таким образом получаем общее решение.

При получении Subscribe актор делает watch подписчика и добавляет его в Map’у подписчиков на заданный топик. При получении Unsubsribe происходит обратное действие. Terminated приходит, если подписанный актор неожиданно умирает. В этом случае он отписывается от всех топиков.

case CheckDelivery =>
  val nextDelivery = Option(consumer.nextDelivery(waitTimeoutMs))
  nextDelivery match {
    case None => scheduleCheckDelivery()
    case Some(delivery) =>
      val fullMsg = new String(delivery.getBody, "UTF-8")
      val JArray(List(JString(topic), JString(msg))) = parse(fullMsg)
      val pubSubMsg = PubSubMessage(topic, msg)
      for(ref <- subscriptions.getOrElse(topic, /* default */).refs) {
        ref ! pubSubMsg
      }
      context.self ! CheckDelivery
  }

Время от времени, например, раз в 10 мс, происходит проверка, нет ли в очереди новых сообщений. Если есть, из сообщения извлекается имя топика и происходит уведомление всех подписанных на этот топик акторов.

Интересно, как реализован метод declareExchange:

private def declareExchange(exchange: String): Unit = {
  chan.exchangeDeclare(exchange, "topic", false, true, messageTtlArgs)
  chan.queueBind(dummyQueueName, exchange, "", null)
}

private val messageTtlArgs: java.util.HashMap[String, AnyRef] = {
  val messageTtlMs = 60000
  val args = new java.util.HashMap[String, AnyRef]()
  args.put("x-message-ttl", new Integer(messageTtlMs))
  args
}

Во-первых, поскольку здесь мы используем RabbitMQ исключительно как in-memory шину, указывается максимальное время жизни всех сообщений, так как хранить их вечно нет смысла. Во-вторых, к новому excahnge’у тут сразу же bind’ится та самая dummy очередь. Дело в том, что здесь мы создаем самоуничтожающийся exchange. Если не осталось ни одной очереди, забайнженой к exchange’у, он будет удален. Но хотя бы одна очередь должна быть забайнжена, иначе удаления не произойдет. Вот для того, чтобы RabbitMQ гарантированно за нами все почистил, когда мы закроем соединение, и нужна dummy очередь. Наконец, в-третьих, что еще интересно, мы должны на всякий случай делать declareExchange каждый раз, когда хотим что-то сделать с exchange’ем (см обработку Publish выше), так как мы не знаем заранее, существует ли уже exchange, а если он и был кем-то когда-то создан, не был ли он удален в результате автоматической чистки или другим клиентом. Кстати, из тех же соображений клиенты должны время от времени делать вызов pubsub.subscribe, а то вдруг сеть мигала и подписка уже потерялась.

Как видите, при использовании RabbitMQ нужно иметь ввиду некоторые нюансы его работы. Но в общем и целом, всем, кто работает с RabbitMQ, они известны и ничего супер сложного здесь нет. Полную версию исходного кода к этой заметке вы найдете в этом репозитории.

Вопросы, дополнения, а также любые мысли по теме и не очень, как обычно, приветствуются и могут быть оставлены в комментариях ниже.

Метки: , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.