Пример обмена сообщениями через RabbitMQ на Scala
7 сентября 2015
Сегодня мы рассмотрим простой пример организации pubsub поверх RabbitMQ. Как уже многократно отмечалось в предыдущих заметках, если вашим бэкендам нужно между собой как-то общаться, желательно взять готовую шину, а не писать по сути свою с нуля на каком-нибудь Akka Cluster. Если только, конечно, вы не пишите свой собственный RabbitMQ или там Spark.
По сути, вот что мы хотим получить:
trait PubSubClient {
def publish(topic: String, msg: String): Future[Unit]
def subscribe(topic: String, ref: ActorRef): Future[Unit]
def unsubscribe(topic: String, ref: ActorRef): Future[Unit]
}
Есть топики и есть какие-то сообщения, которые мы публикуем в этих топиках. Акторы могут подписываться на топики и отписываться от них. При публикации сообщения в топике все подписанные на него акторы (в том числе акторы на других экземплярах бэкенда) получают PubSubMessage с этим сообщением.
Несмотря на простоту интерфейса, практика показывает, что с его помощью можно реализовать абсолютно все, что вы могли хотеть реализовать с использованием Akka Cluster. Гарантии на доставку сообщений будут ничем не хуже. Кроме того, приложение будет намного более предсказуемо вести себя при сетевых проблемах.
Переходим к реализации. Подцепляем к проекту клиент для RabbitMQ:
Поскольку этот пакет предоставляет блокирующий API, нам понадобится создать отдельный dispatcher, прописав в application.conf:
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 1
parallelism-factor = 1.0
parallelism-max = 1
}
}
Соответственно, актор, реализующий весь pubsub, будет запускаться так:
Props(new BroadcastPubSubClientActor(/* ... args ... */))
.withDispatcher("pubsub-actor-dispatcher")
}
val pubSubActorRef = system.actorOf(props, "pubSubClientActor")
Основные моменты в реализации BroadcastPubSubClientActor следующие.
private var consumer: QueueingConsumer = _
private var pubsubQueueName: String = _
private var dummyQueueName: String = _
override def preStart(): Unit = {
log.debug("preStart() called")
context.self ! Connect
}
override def postRestart(e: Throwable): Unit = {
log.warning(s"Reconnecting in $reconnectTimeoutMs ms")
context.system.scheduler.scheduleOnce(
reconnectTimeoutMs.millis, context.self, Connect
)
}
override def receive: Receive = {
case Connect =>
log.debug("Connecting...")
val connFactory = new ConnectionFactory()
connFactory.setHost(host)
connFactory.setPort(port)
connFactory.setUsername(login)
connFactory.setPassword(password)
connFactory.setVirtualHost(vhost)
val conn = connFactory.newConnection()
chan = conn.createChannel()
consumer = new QueueingConsumer(chan)
dummyQueueName = chan.queueDeclare().getQueue
pubsubQueueName = chan.queueDeclare().getQueue
chan.basicConsume(pubsubQueueName, true, consumer)
scheduleCheckDelivery()
context.become(connected)
}
Тут все довольно просто. Устанавливается соединение с RabbitMQ. Создается новый канал. Внутри одного соединения к RabbitMQ может быть много каналов, каждый из которых имеет свои подписки. Далее создается новая очередь, которую актор будет цеплять к интересующим его exchange’ам и из которой будет получать интересующие его сообщения. Также создается dummy-очередь, которая понадобится чуть ниже. Затем актор переходит в состояние connected. Можете на досуге проверить, что в случае разрыва соединения с RabbitMQ актор действительно будет пытаться время от времени его восстановить. В том числе, после перезапуска актор действительно возвращается назад к состоянию receive.
case r: Publish => // PubSubClient.publish
val exchange = exchangeName(r.topic)
val fullMsgJson = JArray(List(JString(r.topic), JString(r.msg)))
val fullMsgStr = compact(fullMsgJson)
declareExchange(exchange)
chan.basicPublish(exchange, r.topic, null,
fullMsgStr.getBytes("UTF-8"))
sender() ! {}
case r: Subscribe => // PubSubClient.subscribe
context.watch(r.ref)
updateSubscribers(r.topic, s => s + r.ref)
sender() ! {}
case r: Unsubsribe => // PubSubClient.unsubscribe
context.unwatch(r.ref)
updateSubscribers(r.topic, s => s - r.ref)
sender() ! {}
case r: Terminated =>
for(topic <- subscriptions.keys) {
updateSubscribers(topic, s => s - r.actor)
}
Здесь происходит обработка основных сообщений. При получении Publish определяется имя соответствующего exchange’а и в него происходит публикация сообщения. Притом сообщение оборачивается в JSON и содержит внутри себя имя топика. Трудно сказать заранее, какая реализация будет лучше работать — с одним exchange’ом на все топики, по одному exchange’у на топик, или какой-то вариант посередине. Поэтому в данной реализации программист заранее задает желаемое количество exchange’ей, а отображение топика в exchange происходит по хэшу. Таким образом получаем общее решение.
При получении Subscribe актор делает watch подписчика и добавляет его в Map’у подписчиков на заданный топик. При получении Unsubsribe происходит обратное действие. Terminated приходит, если подписанный актор неожиданно умирает. В этом случае он отписывается от всех топиков.
val nextDelivery = Option(consumer.nextDelivery(waitTimeoutMs))
nextDelivery match {
case None => scheduleCheckDelivery()
case Some(delivery) =>
val fullMsg = new String(delivery.getBody, "UTF-8")
val JArray(List(JString(topic), JString(msg))) = parse(fullMsg)
val pubSubMsg = PubSubMessage(topic, msg)
for(ref <- subscriptions.getOrElse(topic, /* default */).refs) {
ref ! pubSubMsg
}
context.self ! CheckDelivery
}
Время от времени, например, раз в 10 мс, происходит проверка, нет ли в очереди новых сообщений. Если есть, из сообщения извлекается имя топика и происходит уведомление всех подписанных на этот топик акторов.
Интересно, как реализован метод declareExchange:
chan.exchangeDeclare(exchange, "topic", false, true, messageTtlArgs)
chan.queueBind(dummyQueueName, exchange, "", null)
}
private val messageTtlArgs: java.util.HashMap[String, AnyRef] = {
val messageTtlMs = 60000
val args = new java.util.HashMap[String, AnyRef]()
args.put("x-message-ttl", new Integer(messageTtlMs))
args
}
Во-первых, поскольку здесь мы используем RabbitMQ исключительно как in-memory шину, указывается максимальное время жизни всех сообщений, так как хранить их вечно нет смысла. Во-вторых, к новому excahnge’у тут сразу же bind’ится та самая dummy очередь. Дело в том, что здесь мы создаем самоуничтожающийся exchange. Если не осталось ни одной очереди, забайнженой к exchange’у, он будет удален. Но хотя бы одна очередь должна быть забайнжена, иначе удаления не произойдет. Вот для того, чтобы RabbitMQ гарантированно за нами все почистил, когда мы закроем соединение, и нужна dummy очередь. Наконец, в-третьих, что еще интересно, мы должны на всякий случай делать declareExchange каждый раз, когда хотим что-то сделать с exchange’ем (см обработку Publish выше), так как мы не знаем заранее, существует ли уже exchange, а если он и был кем-то когда-то создан, не был ли он удален в результате автоматической чистки или другим клиентом. Кстати, из тех же соображений клиенты должны время от времени делать вызов pubsub.subscribe
, а то вдруг сеть мигала и подписка уже потерялась.
Как видите, при использовании RabbitMQ нужно иметь ввиду некоторые нюансы его работы. Но в общем и целом, всем, кто работает с RabbitMQ, они известны и ничего супер сложного здесь нет. Полную версию исходного кода к этой заметке вы найдете в этом репозитории.
Вопросы, дополнения, а также любые мысли по теме и не очень, как обычно, приветствуются и могут быть оставлены в комментариях ниже.
Метки: Scala, Функциональное программирование.
Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.