← На главную

Пример обмена сообщениями через RabbitMQ на Scala

Сегодня мы рассмотрим простой пример организации pubsub поверх RabbitMQ. Как уже многократно отмечалось в предыдущих заметках, если вашим бэкендам нужно между собой как-то общаться, желательно взять готовую шину, а не писать по сути свою с нуля на каком-нибудь Akka Cluster. Если только, конечно, вы не пишите свой собственный RabbitMQ или там Spark.

По сути, вот что мы хотим получить:

case class PubSubMessage(topic: String, message: String) trait PubSubClient { def publish(topic: String, msg: String): Future[Unit] def subscribe(topic: String, ref: ActorRef): Future[Unit] def unsubscribe(topic: String, ref: ActorRef): Future[Unit] }

Есть топики и есть какие-то сообщения, которые мы публикуем в этих топиках. Акторы могут подписываться на топики и отписываться от них. При публикации сообщения в топике все подписанные на него акторы (в том числе акторы на других экземплярах бэкенда) получают PubSubMessage с этим сообщением.

Несмотря на простоту интерфейса, практика показывает, что с его помощью можно реализовать абсолютно все, что вы могли хотеть реализовать с использованием Akka Cluster. Гарантии на доставку сообщений будут ничем не хуже. Кроме того, приложение будет намного более предсказуемо вести себя при сетевых проблемах.

Переходим к реализации. Подцепляем к проекту клиент для RabbitMQ:

"com.rabbitmq" % "amqp-client" % "3.5.3"

Поскольку этот пакет предоставляет блокирующий API, нам понадобится создать отдельный dispatcher, прописав в application.conf:

pubsub-actor-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 1 parallelism-factor = 1.0 parallelism-max = 1 } }

Соответственно, актор, реализующий весь pubsub, будет запускаться так:

val props = { Props(new BroadcastPubSubClientActor(/* ... args ... */)) .withDispatcher("pubsub-actor-dispatcher") } val pubSubActorRef = system.actorOf(props, "pubSubClientActor")

Основные моменты в реализации BroadcastPubSubClientActor следующие.

private var chan: Channel = _ private var consumer: QueueingConsumer = _ private var pubsubQueueName: String = _ private var dummyQueueName: String = _ override def preStart(): Unit = { log.debug("preStart() called") context.self ! Connect } override def postRestart(e: Throwable): Unit = { log.warning(s"Reconnecting in $reconnectTimeoutMs ms") context.system.scheduler.scheduleOnce( reconnectTimeoutMs.millis, context.self, Connect ) } override def receive: Receive = { case Connect => log.debug("Connecting...") val connFactory = new ConnectionFactory() connFactory.setHost(host) connFactory.setPort(port) connFactory.setUsername(login) connFactory.setPassword(password) connFactory.setVirtualHost(vhost) val conn = connFactory.newConnection() chan = conn.createChannel() consumer = new QueueingConsumer(chan) dummyQueueName = chan.queueDeclare().getQueue pubsubQueueName = chan.queueDeclare().getQueue chan.basicConsume(pubsubQueueName, true, consumer) scheduleCheckDelivery() context.become(connected) }

Тут все довольно просто. Устанавливается соединение с RabbitMQ. Создается новый канал. Внутри одного соединения к RabbitMQ может быть много каналов, каждый из которых имеет свои подписки. Далее создается новая очередь, которую актор будет цеплять к интересующим его exchange’ам и из которой будет получать интересующие его сообщения. Также создается dummy-очередь, которая понадобится чуть ниже. Затем актор переходит в состояние connected. Можете на досуге проверить, что в случае разрыва соединения с RabbitMQ актор действительно будет пытаться время от времени его восстановить. В том числе, после перезапуска актор действительно возвращается назад к состоянию receive.

def connected: Receive = { case r: Publish => // PubSubClient.publish val exchange = exchangeName(r.topic) val fullMsgJson = JArray(List(JString(r.topic), JString(r.msg))) val fullMsgStr = compact(fullMsgJson) declareExchange(exchange) chan.basicPublish(exchange, r.topic, null, fullMsgStr.getBytes("UTF-8")) sender() ! {} case r: Subscribe => // PubSubClient.subscribe context.watch(r.ref) updateSubscribers(r.topic, s => s + r.ref) sender() ! {} case r: Unsubsribe => // PubSubClient.unsubscribe context.unwatch(r.ref) updateSubscribers(r.topic, s => s - r.ref) sender() ! {} case r: Terminated => for(topic <- subscriptions.keys) { updateSubscribers(topic, s => s - r.actor) }

Здесь происходит обработка основных сообщений. При получении Publish определяется имя соответствующего exchange’а и в него происходит публикация сообщения. Притом сообщение оборачивается в JSON и содержит внутри себя имя топика. Трудно сказать заранее, какая реализация будет лучше работать – с одним exchange’ом на все топики, по одному exchange’у на топик, или какой-то вариант посередине. Поэтому в данной реализации программист заранее задает желаемое количество exchange’ей, а отображение топика в exchange происходит по хэшу. Таким образом получаем общее решение.

При получении Subscribe актор делает watch подписчика и добавляет его в Map’у подписчиков на заданный топик. При получении Unsubsribe происходит обратное действие. Terminated приходит, если подписанный актор неожиданно умирает. В этом случае он отписывается от всех топиков.

case CheckDelivery => val nextDelivery = Option(consumer.nextDelivery(waitTimeoutMs)) nextDelivery match { case None => scheduleCheckDelivery() case Some(delivery) => val fullMsg = new String(delivery.getBody, "UTF-8") val JArray(List(JString(topic), JString(msg))) = parse(fullMsg) val pubSubMsg = PubSubMessage(topic, msg) for(ref <- subscriptions.getOrElse(topic, /* default */).refs) { ref ! pubSubMsg } context.self ! CheckDelivery }

Время от времени, например, раз в 10 мс, происходит проверка, нет ли в очереди новых сообщений. Если есть, из сообщения извлекается имя топика и происходит уведомление всех подписанных на этот топик акторов.

Интересно, как реализован метод declareExchange:

private def declareExchange(exchange: String): Unit = { chan.exchangeDeclare(exchange, "topic", false, true, messageTtlArgs) chan.queueBind(dummyQueueName, exchange, "", null) } private val messageTtlArgs: java.util.HashMap[String, AnyRef] = { val messageTtlMs = 60000 val args = new java.util.HashMap[String, AnyRef]() args.put("x-message-ttl", new Integer(messageTtlMs)) args }

Во-первых, поскольку здесь мы используем RabbitMQ исключительно как in-memory шину, указывается максимальное время жизни всех сообщений, так как хранить их вечно нет смысла. Во-вторых, к новому excahnge’у тут сразу же bind’ится та самая dummy очередь. Дело в том, что здесь мы создаем самоуничтожающийся exchange. Если не осталось ни одной очереди, забайнженой к exchange’у, он будет удален. Но хотя бы одна очередь должна быть забайнжена, иначе удаления не произойдет. Вот для того, чтобы RabbitMQ гарантированно за нами все почистил, когда мы закроем соединение, и нужна dummy очередь. Наконец, в-третьих, что еще интересно, мы должны на всякий случай делать declareExchange каждый раз, когда хотим что-то сделать с exchange’ем (см обработку Publish выше), так как мы не знаем заранее, существует ли уже exchange, а если он и был кем-то когда-то создан, не был ли он удален в результате автоматической чистки или другим клиентом. Кстати, из тех же соображений клиенты должны время от времени делать вызов pubsub.subscribe, а то вдруг сеть мигала и подписка уже потерялась.

Как видите, при использовании RabbitMQ нужно иметь ввиду некоторые нюансы его работы. Но в общем и целом, всем, кто работает с RabbitMQ, они известны и ничего супер сложного здесь нет. Полную версию исходного кода к этой заметке вы найдете в этом репозитории.

Вопросы, дополнения, а также любые мысли по теме и не очень, как обычно, приветствуются и могут быть оставлены в комментариях ниже.