Работа с MessagePack и MessagePack RPC в Haskell

9 октября 2013

MessagePack — это формат, напоминающий JSON, только более быстрый и более компактный. Например, {"a":1,"b":2} занимает 13 байт в JSON, 19 байт в BSON и всего лишь 7 байт в MessagePack. MessagePack RPC представляет собой протокол удаленного вызова процедур, основанный на MessagePack. Полная реализация MessagePack RPC предоставляет синхронный и асинхронный обмен сообщениями по TCP, UDP или через Unix-сокеты. Давайте выясним, как работать со всем этим хозяйством в Haskell.

Работа с самим MessagePack не очень интересна. Сериализация происходит с помощью функции pack, а десериализация — с помощью unpack:

ghci> import Data.MessagePack as MP
ghci> let msg = MP.pack ["aaa", "bbb", "ccc"]
ghci> (MP.unpack msg) :: [String]
["aaa","bbb","ccc"]

Куда интереснее попробовать MessagePack RPC. Вообще, когда имеет смысл его использовать и чем плоха всеми нами любимая связка из JSON и HTTP?

Пусть имеется некое приложение, написанное на Haskell, которому приходится часто обращаться к различным счетам в системе. Поскольку постоянно ходить в базу за счетами занимает слишком много времени, приложение вынуждено сразу после запуска подгружать счета в оперативную память. При этом загрузка всех счетов занимает существенное время. Haskell — это вам не Erlang, горячего обновления кода в нем (пока что?) нет. Если мы решаем выкатить фикс и перезапустить приложение, оно не сможет отвечать на запросы пользователей до тех пор, пока не инициализируется кэш счетов.

Что же делать? Одно из возможных решений заключается в том, чтобы разбить приложение на два приложения. Первое приложение будет отвечать за хранение кэша счетов и выполнение некой незамысловатой логики. Предполагается, что эта логика меняется очень редко. Второе приложение будет отвечать за все остальное. При выкатке фикса придется перезапустить только второе приложение. Поскольку кэш хранится в первом приложении, перезапуск произойдет быстро и пользователи ничего не заметят. Каждую секунду в системе могут выполняться тысячи операций со счетами, поэтому REST API для взаимодействия приложений в данном случае, по видимому, не подходит. Но можно попробовать MsgPack RPC.

Простейшая реализация сервера (приложения-кэша) выглядит как-то так:

import qualified Data.Map.Strict as M
import qualified Data.ByteString as BS
import Control.Monad.IO.Class
import Control.Concurrent.STM
import Network.MessagePackRpc.Server

rpcRead :: Ord k => TVar (M.Map k v) -> k -> Method (Maybe v)
rpcRead tm k = do
  m <- liftIO.atomically $ readTVar tm
  return $ M.lookup k m

rpcWrite :: Ord k => TVar (M.Map k v) -> k -> v -> Method ()
rpcWrite tm k v =
  liftIO.atomically $ modifyTVar' tm $
    M.insert k v

port :: Int
port = 1234

main :: IO ()
main = do
  tm <- newTVarIO (M.empty :: M.Map BS.ByteString BS.ByteString)
  putStrLn $ "Starting msgpack-rpc server on port " ++ show port
  serve port
    [ ("read", toMethod $ rpcRead tm)
    , ("write", toMethod $ rpcWrite tm)
    ]

Состояние приложения представляет собой строгий Map с ключом типа ByteString и значением типа ByteString. Чтобы не возникало проблем при хождении в Map нескольких потоков одновременно, используется транзакционная память. Никакой другой логики, скажем, хождения в базу данных при запуске приложения, нет, ибо пример все же как бы игрушечный.

А вот и код клиента:

import Data.Time
import Data.Maybe
import Control.Monad
import Control.Monad.IO.Class
import Control.DeepSeq
import Data.ByteString.Char8 hiding (putStrLn)
import Network.MessagePackRpc.Client

rpcRead :: ByteString -> Client (Maybe ByteString)
rpcRead = call "read"

rpcWrite :: ByteString -> ByteString -> Client ()
rpcWrite = call "write"

writeLog :: String -> Client ()
writeLog msg = liftIO $ do
  t <- getCurrentTime
  putStrLn $ "[" ++ show t ++ "] " ++ msg

actionsNumber :: Int
actionsNumber = 100000

main = runClient (pack "localhost") 1234 $ do
  writeLog "Preparing kvList..."
  let kvList = force [ pack $ show x | x <- [1..actionsNumber] ]
  writeLog "Writing to cache..."
  forM_ kvList $ \x -> rpcWrite x x
  writeLog "Writing finished"
  writeLog "Reading from cache..."
  readRslt <- forM kvList rpcRead
  writeLog "Reading finished"
  let s = sum [ (read $ unpack x) :: Integer | x <- catMaybes readRslt]
  writeLog $ "Sum: " ++ show s

Клиент тоже игрушечный, поэтому он просто делает сто тысяч записей в кэш, а затем — сто тысяч чтений. На моем ноутике получилось ~7700 операций чтения и записи в секунду или около 0.13 миллисекунд на чтение/запись. Чтение и запись происходят с одинаковой скоростью. При запуске одновременно двух клиентов наблюдалось ~5150 операций в секунду или 0.19 миллисекунд на одну операцию (с точки зрения одного клиента!). Примечательно, что в Erlang на этом же ноутике одно хождение через gen_server:call на соседнюю ноду занимает в среднем 0.21 миллисекунды, что как бы сравнимо.

Понятно, что на самом деле эти бенчмарки мало о чем говорят. Все приложения уникальны. В одних передается больше данных, в других — меньше, но при этом нужно производить больше расчетов. Где-то можно уменьшить количество передаваемых сообщений, поместив больше логики в сервер, а где-то можно применить пакетное чтение и пакетную запись. Иногда имеет смысл расшардить данные по нескольким map’ам или нескольким серверам. В зависимости от ситуации могут быть применены те или иные оптимизации. Каждый случай нужно рассматривать отдельно.

Как по мне, MsgPack выглядит очень даже няшненько. Написав менее 30 строк кода мы фактически получили простую документо-ориентированную in-memory СУБД, выполняющую свыше 10000 операций в секунду. Вы спросите, почему документо-ориентированную, а не key-value? Потому что значением может быть закодированный с помощью MsgPack объект.

Также мы автоматически получили что-то вроде remsh для Haskell:

$ cabal repl kv-client
ghci> :set -XOverloadedStrings
ghci> runClient "localhost" 1234 $ do rpcWrite "aaa" "aaa"; r <- rpcRead "aaa"; liftIO $ putStrLn $ show r
Just "aaa"

Если не хочется писать много шаблонного кода, воспользуйтесь Template Haskell.

Что же до недостатков конкретно Haskell’евского пакета — кажется, кое-что из MsgPack RPC в нем еще не реализовано, например, асинхронные вызовы и поддержка UDP.

Ссылки по теме:

Исходники к этой заметке вы найдете в этом архиве. Для их сборки обновите cabal до версии 1.18 или старше, после чего используйте команды cabal sandbox init и cabal install.

Дополнение: Вас также могут заинтересовать статьи Сериализация и десериализация в/из Protobuf на C++ и Сериализация в языке Go на примере библиотеки codec.

Метки: , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.