Использование кондуитов (conduits) в Haskell

11 сентября 2013

В мире Haskell есть такой хороший пакет conduit за авторством небезызвестного Michael Snoyman. Этот пакет позволяет легко и непринужденно делать множество полезных вещей с файлами, сокетами, соединениями с базами данных, всякими там TQueue или даже обычными списками. На первый взгляд пакет кажется сложным и пугающим. Но, как это часто бывает с Haskell, если не пытаться вникнуть во все наскоком, а сесть и спокойно разобраться, выясняется, что в действительности все предельно просто.

Сразу сделаю оговорку по поводу перевода слова «conduit». Варианты «канал» или «трубопровод» плохи тем, что не позволяют по переводу однозначно определить оригинальный термин. Слово «трубопровод» может быть переведено на английский язык как conduit или pipe, а пайпам в мире Haskell соответствует совершенно другой пакет. Аналогично «канал» может быть переведен как conduit, channel, canal и другими способами. Далее по тексту я буду использовать вариант «кондуит», как это было сделано в русском переводе книги Developing Web Applications with Haskell and Yesod.

Пристегните ремни. Сегодня мы не только изучим основы работы с кондуитами, но и научимся создавать с их помощью HTTP-сервера, передающие в одном ответе сколь угодно большие, хоть бесконечные, объемы данных, используя при этом постоянный объем памяти. Также мы узнаем, как писать соответствующие HTTP-клиенты.

Матчасть

Итак, что же нам дают кондуиты? На мой взгляд, самое полезное их свойство заключается в возможности писать хорошо совместимые друг с другом компоненты. Например, один программист может написать код для записи данных в файл, второй программист — код для получения данных по протоколу HTTP, а третий — для распаковки данных, сжатых gzip. Затем четвертый программист с помощью всего лишь нескольких строк кода свяжет эти компоненты в программу, которая скачивает сжатые данные по HTTP, распаковывает их и записывает в файл на диске.

Не менее важные свойства кондуитов состоят в следующем. Все это будет происходить в постоянном объеме памяти, даже если файл имеет размер в несколько гигабайт. Закрытие всех файловых дескрипторов произойдет предсказуемым образом, сразу, как только закончится передача данных. Если в одном из компонентов будет брошено исключение (закончится место на диске, мигнет сеть или придет некорректный gzip), файловые дескрипторы также не утекут, а будут сразу же закрыты. Наконец, кондуиты строго статически типизированы и прекрасно параллелятся.

Основные типы:

  • Source m a — источник данных типа a в монаде m;
  • Sink a m b — сток, в который приходят данные типа a в монаде m, а в результате возвращается тип b;
  • Conduit a m b — кондуит, соединяющий источник, порождающий данные типа a, и сток, принимающий данные типа b;

В более привычных терминах источник — это продюсер, синк — консьюмер, а кондуит — это связующее звено и некое преобразование между продюсером и консьюмером. При этом кондуит может иметь некоторое внутреннее состояние и отдавать больше или меньше данных, нежели принимается.

Обратите внимание, что перечисленные выше типы представляют собой всего лишь type-обертки вокруг ConduitM:

-- Source - это ConduitM без входа и возвращаемого значения
type Source m a    = ConduitM () a m ()

-- Conduit - это ConduitM без возвращаемого значения
type Conduit a m b = ConduitM a b m ()

-- Sink - это ConduitM без выхода
type Sink a m b    = ConduitM a Void m b

Скоро нам это пригодится.

Источник, сток и кондуит соединяются друг с другом при помощи следующих функций:

-- соединяет источник с кондуитом, создавая новый источник
($=)  :: Monad m => Source m a -> Conduit a m b -> Source m b

-- соединяет кондуит со стоком, создавая новый сток
(=$)  :: Monad m => Conduit a m b -> Sink b m c -> Sink a m c

-- соединяет источник со стоком
($$)  :: Monad m => Source m a -> Sink a m b -> m b

Кроме того, есть функция (=$=), которая соединяет кондуит с кондуитом, создавая новый кондуит.

Копирование файлов

Давайте запустим ghci и попробуем произвести простое копирование файлов при помощи кондуитов.

ghci> :m + Data.ByteString
ghci> :m + Data.Conduit
ghci> :m + Data.Conduit.Binary
ghci> :t sourceFile
sourceFile
  :: MonadResource m => FilePath -> ConduitM i ByteString m ()

Вспомним, что Source представляет собой синонимом типа:

ghci> :i Source
type Source m o = ConduitM () o m ()
    -- Defined in `Data.Conduit.Internal'

Теперь видно, что функция sourceFile принимает имя файла и возвращает источник, генерирующий ByteString.

ghci> let source = sourceFile "./src/CopyFile.hs" :: Source (ResourceT IO) ByteString

Здесь ResourceT — это трансформатор монады, решающий ту же задачу, что и функции bracket или withFile, но ResourceT при этом является более гибким инструментом. Объяснение трансформаторов монад выходит за рамки данного поста, но в них нет абсолютно ничего страшного. Можно думать о них, как об обычных монадах, параметризуемых другими монадами. Функция runResourceT «выполняет» ResourceT подобно тому, как runState «выполняет» монаду State. Ресурсы, выделяемые в ResourceT гарантированно освобождаются к моменту выхода из функции runResourceT, подобно тому, как это работает для withFile.

Ок, теперь у нас есть файл-источник. Если мы хотим скопировать его в другой файл, нам, видимо, понадобится файл-сток:

ghci> :t sinkFile
sinkFile
  :: MonadResource m => FilePath -> ConduitM ByteString o m ()
ghci> :i Sink
type Sink i m r = ConduitM i void-0.6.1:Data.Void.Void m r
    -- Defined in `Data.Conduit.Internal'
ghci> let sink = sinkFile "/tmp/copy.hs" :: Sink ByteString (ResourceT IO) ()

Вспомним, что для объединения источника со стоком служит функция ($$):

ghci> :t ($$)
($$) :: Monad m => Source m a -> Sink a m b -> m b
ghci> :t source $$ sink
source $$ sink :: ResourceT IO ()

Как уже отмечалось, для выполнения действий, обернутых в ResourceT, предназначена функция runResourceT:

ghci> :t runResourceT
runResourceT :: MonadBaseControl IO m => ResourceT m a -> m a
ghci> :t runResourceT $ source $$ sink
runResourceT $ source $$ sink :: IO ()
ghci> runResourceT $ source $$ sink

В результате файл ./src/CopyFile.hs будет скопирован в /tmp/copy.hs.

А вот как это будет выглядеть не в ghci, а в нормальном коде на Haskell:

import Data.Conduit
import Data.Conduit.Binary

main :: IO ()
main = do
  let source = sourceFile "./src/CopyFile.hs"
      sink   = sinkFile   "/tmp/copy.hs"
  runResourceT $ source $$ sink

Разве это не прекрасно!

Пробуем создать свой кондуит

Только что мы научились использовать источники и стоки. Давайте теперь попробуем написать простой кондуит. Для начала напишем совершенно бесполезный кондуит, который просто копирует данные, ничего с ними не делая.

Помним, что Conduit является синонимом типа:

ghci> :i Conduit
type Conduit i m o = ConduitM i o m ()

Основные функции для создания кондуита следующие:

ghci> :t await
await :: Monad m => ConduitM i o m (Maybe i)
ghci> :t yield
yield :: Monad m => o -> ConduitM i o m ()
ghci> :t leftover
leftover :: i -> ConduitM i o m ()

Функция await забирает данные из присоединенного источника или кондуита, а yield пишет данные в присоединенный сток или кондуит. Функция leftover кладет значение обратно в очередь входных данных. Это значение будет прочитано при следующем вызове await.

В общем-то, этих знаний достаточно для того, чтобы написать копирующий кондуит:

ghci> let copyConduit = await >>= \x -> case x of Nothing -> return (); Just j -> yield j :: Conduit ByteString (ResourceT IO) ByteString
ghci> :t copyConduit
copyConduit :: ConduitM ByteString ByteString (ResourceT IO) ()

Вспомним, что для присоединения кондуита к стоку используется функция (=$), для соединения источника и кондуита — ($=), а для соединения двух кондуитов предназначена (=$=):

ghci> :t copyConduit =$ sink
copyConduit =$ sink :: Sink ByteString (ResourceT IO) ()
ghci> :t source $= copyConduit
source $= copyConduit :: Source (ResourceT IO) ByteString
ghci> :t copyConduit =$= copyConduit
copyConduit =$= copyConduit
  :: ConduitM ByteString ByteString (ResourceT IO) ()

Вообще, запомнить, какая функция что делает, несложно. С той стороны, где параметром должен быть кондуит, функции имеют знак равенства, напоминающий трубу. Со стороны, где параметром должен быть источник или сток, ставится доллар, похожий на волну. Легко заметить, что соединить источник, кондуит и сток, можно разными способами. Все они равнозначны:

ghci> :t source $$ copyConduit =$ sink
source $$ copyConduit =$ sink :: ResourceT IO ()
ghci> :t source $= copyConduit $$ sink
source $= copyConduit $$ sink :: ResourceT IO ()
ghci> runResourceT $ source $= copyConduit $$ sink

Если бы кондуит был не копирующим, а, скажем, шифрующим, в первом случае мы бы получили сток, шифрующих входные данные, а во втором — источник зашифрованных данных. Соединяйте источники, кондуиты и стоки в порядке, который больше соответствует вашему пониманию решения задачи.

Кстати, а давайте напишем шифрующий кондуит:

ghci> :m + Data.Bits
ghci> let cryptConduit = await >>= \x -> case x of Nothing -> return (); Just j -> yield (pack $ Prelude.map (`xor` 42) $ unpack j) :: Conduit ByteString (ResourceT IO) ByteString
ghci> runResourceT $ source $= cryptConduit $$ sink

Правда круто?

Кондуиты в Scotty

Помните, как мы писали RESTful сервис на Scotty? В тот раз мы просто читали все данные из БД в память, после чего кодировали их в JSON и отдавали клиенту. А что, если бы у нас было 5 Гб данных? Или вообще бесконечный поток неких событий? Понятно, что в этом случае мы не можем класть данные в память. Тут-то на помощь и приходят кондуиты!

Рассмотрим пример:

{-# LANGUAGE OverloadedStrings #-}

import Web.Scotty
import Data.Conduit
import Blaze.ByteString.Builder
import qualified Data.ByteString.Lazy as LBS
import Data.Aeson
import Control.Monad
import Control.Monad.Trans
import Control.Concurrent (threadDelay)

main = scotty 8080 $ do
  get "/api/v1.0/streams/simple"  getSimpleStream
  get "/api/v1.0/streams/counter" getCounterStream

getSimpleStream = do
  header "Content-type" "application/json"
  source simpleSource

getCounterStream = do
  header "Content-type" "application/x-json-stream"
  source counterSource

simpleSource :: Source (ResourceT IO) (Flush Builder)
simpleSource = do
  sendJson [ "message" .= ("Hello!" :: String) ]

counterSource :: Source (ResourceT IO) (Flush Builder)
counterSource = do
  let num = 100 :: Int
  forM_ [1..num] $ \ctr -> do
                     liftIO $ threadDelay 100000
                     sendJson [ "counter" .= ctr ]

sendJson x = do
  sendLBS $ (encode $ object x) `LBS.append` "\n"
  yield Flush
  where
    sendLBS = yield . Chunk . fromLazyByteString

Для работы с кондуитами в Scotty есть функция source:

source :: Source (ResourceT IO) (Flush Builder) -> ActionM ()

Тип Flush предельно прост:

data Flush a = Chunk a | Flush

Благодаря ему источник может генерировать куски данных, обернутые в Chunk, а затем слать Flush, создавая эффект, подобный вызову сишной функции fflush. Эта возможность требуется во многих библиотеках, например, предназначенных для сжатия данных. В случае со Scotty в ответ на запрос посылается заголовок Transfer-Encoding: chunked, а очередной chunk посылается только после получения Flush.

Тип Builder предназначен для эффективного построения ByteString. Для создания билдера используются функции fromByteString и fromLazyByteString, а для создания ByteString’а из билдера — функции toByteString и toLazyByteString. Bulder является экземпляром класса Monoid, в связи с чем для объединения двух Builder’ов используется функция mappend, а для представления пустого билдера можно использовать функцию mempty. Вообще, идея та же, что и в каком-нибудь StringBuilder из мира Java.

В учетом вышесказанного, несложно разобраться, как работает функция sendJson. А все остальное нам уже знакомо.

Посмотрим на наш HTTP-сервер в действии:

$ curl localhost:8080/api/v1.0/streams/simple -N -D -
HTTP/1.1 200 OK
Server: Warp/1.3.9
Transfer-Encoding: chunked
Content-type: application/json

{"message":"Hello!"}

$ curl localhost:8080/api/v1.0/streams/counter -N -D -
HTTP/1.1 200 OK
Server: Warp/1.3.9
Transfer-Encoding: chunked
Content-type: application/x-json-stream

{"counter":1}
{"counter":2}
{"counter":3}
( ... и так далее по одному сообщению раз в 100 мс ... )
{"counter":99}
{"counter":100}

Теперь попробуем написать на кондуитах HTTP-клиента.

Пакет http-conduit

Начнем с простого клиента, который дергает конец streams/simple и сохраняет полученный ответ в файл simple.json:

import Network.HTTP.Conduit
import Data.Conduit
import Data.Conduit.Binary

main = do
  manager <- newManager def
  req <- parseUrl "http://localhost:8080/api/v1.0/streams/simple"
  runResourceT $ do
    res <- http req manager
    responseBody res $$+- sinkFile "simple.json"

Для создания менеджера соединений используется функция newManager:

newManager :: ManagerSettings -> IO Manager

Тип ManagerSettings является экземпляром класса Data.Default, поэтому для получения настроек по умолчанию можно воспользоваться функцией def. Кстати, этот же прием удобно использовать для создания на Haskell функций как бы с необязательными именованными параметрами.

В несколько упрощенном виде тип функции http выглядит так:

http
  :: Request m
  -> Manager
  -> m (Response (ResumableSource m ByteString))

С первыми двумя параметрами все более-менее ясно. Тип Response представляет ответ сервера. Для извлечения тела ответа воспользуемся функцией responseBody:

responseBody :: Response body -> body

Очевидно, после этого мы получим тип ResumableSource m ByteString. Этот тип представляет источник, часть данных из которого можно передать в один сток, а оставшуюся часть — в другие стоки. Очень полезная возможность, если мы хотим, скажем, сохранить 20 Гб данных из сети в файловой системе FAT32. Для работы с ResumableSource предназначены следующие функции:

-- соединить источник со стоком и после
--   закрытия стока получить ResumableSource
($$+)
  :: Monad m =>
     Source m a -> Sink a m b -> m (ResumableSource m a, b)

-- соединить ResumableSource со следующим стоком и получить
--   новый ResumableSource
($$++)
  :: Monad m =>
     ResumableSource m a -> Sink a m b -> m (ResumableSource m a, b)

-- соединить ResumableSource с последним стоком
($$+-) :: Monad m => ResumableSource m a -> Sink a m b -> m b

Сейчас мы просто хотим сохранить тело ответа в файл, поэтому говорим:

responseBody res $$+- sinkFile "simple.json"

Рассмотрим чуть более сложный пример:

{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit
import Network.HTTP.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.ByteString.Char8 as BS
import Control.Monad.Trans

main = do
  manager <- newManager def
  req <- parseUrl "http://localhost:8080/api/v1.0/streams/counter"
  let headers = requestHeaders req
      req' = req {
          requestHeaders = ("Accept", "application/x-json-stream") :
                           headers
        }
  runResourceT $ do
    res <- http req' manager
    responseBody res $$+- CB.lines =$ counterSink

counterSink :: Sink BS.ByteString (ResourceT IO) ()
counterSink = do
  md <- await
  case md of
    Nothing -> return ()
    Just d -> do
      liftIO $ BS.putStrLn "--------"
      liftIO $ BS.putStrLn d
      counterSink

Здесь для порядку мы передали правильный Accept-заголовок, а для обработки данных написали собственный сток. Как видите, тут используется тот же набор функций, что был применен при написании шифрующего кондуита. Все благодаря тому, что Source, Sink и Conduit — это все на самом деле ConduitM. Строго говоря, нет никакой гарантии, что один JSON-объект будет приходить в одном chunk’е, поэтому мы нарезаем HTTP-поток на строки с помощью кондуита lines из модуля Data.Conduit.Binary.

Заключение

Настоятельно рекомендую обратить внимание на модуль Data.Conduit.List, в нем содержится немало полезных функций. Все они довольно простые. Я уверен, вы сами во всем разберетесь.

Несмотря на то, что этот пост содержит довольно много букв, в действительности пользоваться кондуитами очень просто. Все-то нужно осилить пару типов и десяток функций для работы с ними. Один раз попробовав использовать кондуиты, вы перестанете понимать, как вообще без них можно писать что-то серьезное.

Ссылки по теме:

  • В пакете stm-conduit вы найдете функцию (>=<), предназначенную для объединения нескольких источников в один, функции sourceTMChan, sinkTMChan и tryReadTBMChan, с помощью которых можно «нарезать» источник на несколько источников, а также функцию buffer, позволяющую выполнять работу в источнике и стоке параллельно;
  • Кондуиты хорошо объяснены в книге Developing Web Applications with Haskell and Yesod все того же Michael Snoyman, в том числе там приводятся примеры использования пакета xml-conduit;

Все исходники к этой заметке вы найдете в этом архиве. Как обычно, для их сборки используйте команду cabal-dev install. А если вы уже обновились до cabal 1.18, то можете собрать код командой cabal sandbox init; cabal install без необходимости ставить cabal-dev. Подробности о сборке проектов на Haskell вы найдете в этой заметке.

Если вы честно прочитали этот пост от начала до конца и все поняли, то имеете полное право гордиться собой. Сходите в ближайшее кафе и закажите себе какую-нибудь вкусняшку. Вы это заслужили.

Метки: , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.