← На главную

Использование кондуитов (conduits) в Haskell

В мире Haskell есть такой хороший пакет conduit за авторством небезызвестного Michael Snoyman. Этот пакет позволяет легко и непринужденно делать множество полезных вещей с файлами, сокетами, соединениями с базами данных, всякими там TQueue или даже обычными списками. На первый взгляд пакет кажется сложным и пугающим. Но, как это часто бывает с Haskell, если не пытаться вникнуть во все наскоком, а сесть и спокойно разобраться, выясняется, что в действительности все предельно просто.

Сразу сделаю оговорку по поводу перевода слова «conduit». Варианты «канал» или «трубопровод» плохи тем, что не позволяют по переводу однозначно определить оригинальный термин. Слово «трубопровод» может быть переведено на английский язык как conduit или pipe, а пайпам в мире Haskell соответствует совершенно другой пакет. Аналогично «канал» может быть переведен как conduit, channel, canal и другими способами. Далее по тексту я буду использовать вариант «кондуит», как это было сделано в русском переводе книги Developing Web Applications with Haskell and Yesod.

Пристегните ремни. Сегодня мы не только изучим основы работы с кондуитами, но и научимся создавать с их помощью HTTP-сервера, передающие в одном ответе сколь угодно большие, хоть бесконечные, объемы данных, используя при этом постоянный объем памяти. Также мы узнаем, как писать соответствующие HTTP-клиенты.

Матчасть

Итак, что же нам дают кондуиты? На мой взгляд, самое полезное их свойство заключается в возможности писать хорошо совместимые друг с другом компоненты. Например, один программист может написать код для записи данных в файл, второй программист – код для получения данных по протоколу HTTP, а третий – для распаковки данных, сжатых gzip. Затем четвертый программист с помощью всего лишь нескольких строк кода свяжет эти компоненты в программу, которая скачивает сжатые данные по HTTP, распаковывает их и записывает в файл на диске.

Не менее важные свойства кондуитов состоят в следующем. Все это будет происходить в постоянном объеме памяти, даже если файл имеет размер в несколько гигабайт. Закрытие всех файловых дескрипторов произойдет предсказуемым образом, сразу, как только закончится передача данных. Если в одном из компонентов будет брошено исключение (закончится место на диске, мигнет сеть или придет некорректный gzip), файловые дескрипторы также не утекут, а будут сразу же закрыты. Наконец, кондуиты строго статически типизированы и прекрасно параллелятся.

Основные типы:

  • Source m a – источник данных типа a в монаде m;
  • Sink a m b – сток, в который приходят данные типа a в монаде m, а в результате возвращается тип b;
  • Conduit a m b – кондуит, соединяющий источник, порождающий данные типа a, и сток, принимающий данные типа b;

В более привычных терминах источник – это продюсер, синк – консьюмер, а кондуит – это связующее звено и некое преобразование между продюсером и консьюмером. При этом кондуит может иметь некоторое внутреннее состояние и отдавать больше или меньше данных, нежели принимается.

Обратите внимание, что перечисленные выше типы представляют собой всего лишь type-обертки вокруг ConduitM:

-- Source - это ConduitM без входа и возвращаемого значения type Source m a = ConduitM () a m () -- Conduit - это ConduitM без возвращаемого значения type Conduit a m b = ConduitM a b m () -- Sink - это ConduitM без выхода type Sink a m b = ConduitM a Void m b

Скоро нам это пригодится.

Источник, сток и кондуит соединяются друг с другом при помощи следующих функций:

-- соединяет источник с кондуитом, создавая новый источник ($=) :: Monad m => Source m a -> Conduit a m b -> Source m b -- соединяет кондуит со стоком, создавая новый сток (=$) :: Monad m => Conduit a m b -> Sink b m c -> Sink a m c -- соединяет источник со стоком ($$) :: Monad m => Source m a -> Sink a m b -> m b

Кроме того, есть функция (=$=), которая соединяет кондуит с кондуитом, создавая новый кондуит.

Копирование файлов

Давайте запустим ghci и попробуем произвести простое копирование файлов при помощи кондуитов.

ghci> :m + Data.ByteString ghci> :m + Data.Conduit ghci> :m + Data.Conduit.Binary ghci> :t sourceFile sourceFile :: MonadResource m => FilePath -> ConduitM i ByteString m ()

Вспомним, что Source представляет собой синонимом типа:

ghci> :i Source type Source m o = ConduitM () o m () -- Defined in `Data.Conduit.Internal'

Теперь видно, что функция sourceFile принимает имя файла и возвращает источник, генерирующий ByteString.

ghci> let source = sourceFile "./src/CopyFile.hs" :: Source ⏎ (ResourceT IO) ByteString

Здесь ResourceT – это трансформатор монады, решающий ту же задачу, что и функции bracket или withFile, но ResourceT при этом является более гибким инструментом. Объяснение трансформаторов монад выходит за рамки данного поста, но в них нет абсолютно ничего страшного. Можно думать о них, как об обычных монадах, параметризуемых другими монадами. Функция runResourceT «выполняет» ResourceT подобно тому, как runState «выполняет» монаду State. Ресурсы, выделяемые в ResourceT гарантированно освобождаются к моменту выхода из функции runResourceT, подобно тому, как это работает для withFile.

Ок, теперь у нас есть файл-источник. Если мы хотим скопировать его в другой файл, нам, видимо, понадобится файл-сток:

ghci> :t sinkFile sinkFile :: MonadResource m => FilePath -> ConduitM ByteString o m () ghci> :i Sink type Sink i m r = ConduitM i void-0.6.1:Data.Void.Void m r -- Defined in `Data.Conduit.Internal' ghci> let sink = sinkFile "/tmp/copy.hs" :: Sink ByteString (ResourceT IO) ()

Вспомним, что для объединения источника со стоком служит функция ($$):

ghci> :t ($$) ($$) :: Monad m => Source m a -> Sink a m b -> m b ghci> :t source $$ sink source $$ sink :: ResourceT IO ()

Как уже отмечалось, для выполнения действий, обернутых в ResourceT, предназначена функция runResourceT:

ghci> :t runResourceT runResourceT :: MonadBaseControl IO m => ResourceT m a -> m a ghci> :t runResourceT $ source $$ sink runResourceT $ source $$ sink :: IO () ghci> runResourceT $ source $$ sink

В результате файл ./src/CopyFile.hs будет скопирован в /tmp/copy.hs.

А вот как это будет выглядеть не в ghci, а в нормальном коде на Haskell:

import Data.Conduit import Data.Conduit.Binary main :: IO () main = do let source = sourceFile "./src/CopyFile.hs" sink = sinkFile "/tmp/copy.hs" runResourceT $ source $$ sink

Разве это не прекрасно!

Пробуем создать свой кондуит

Только что мы научились использовать источники и стоки. Давайте теперь попробуем написать простой кондуит. Для начала напишем совершенно бесполезный кондуит, который просто копирует данные, ничего с ними не делая.

Помним, что Conduit является синонимом типа:

ghci> :i Conduit type Conduit i m o = ConduitM i o m ()

Основные функции для создания кондуита следующие:

ghci> :t await await :: Monad m => ConduitM i o m (Maybe i) ghci> :t yield yield :: Monad m => o -> ConduitM i o m () ghci> :t leftover leftover :: i -> ConduitM i o m ()

Функция await забирает данные из присоединенного источника или кондуита, а yield пишет данные в присоединенный сток или кондуит. Функция leftover кладет значение обратно в очередь входных данных. Это значение будет прочитано при следующем вызове await.

В общем-то, этих знаний достаточно для того, чтобы написать копирующий кондуит:

ghci> let copyConduit = await >>= \x -> case x of Nothing -> ⏎ return (); Just j -> yield j :: Conduit ByteString (ResourceT IO) ByteString ghci> :t copyConduit copyConduit :: ConduitM ByteString ByteString (ResourceT IO) ()

Вспомним, что для присоединения кондуита к стоку используется функция (=$), для соединения источника и кондуита – ($=), а для соединения двух кондуитов предназначена (=$=):

ghci> :t copyConduit =$ sink copyConduit =$ sink :: Sink ByteString (ResourceT IO) () ghci> :t source $= copyConduit source $= copyConduit :: Source (ResourceT IO) ByteString ghci> :t copyConduit =$= copyConduit copyConduit =$= copyConduit :: ConduitM ByteString ByteString (ResourceT IO) ()

Вообще, запомнить, какая функция что делает, несложно. С той стороны, где параметром должен быть кондуит, функции имеют знак равенства, напоминающий трубу. Со стороны, где параметром должен быть источник или сток, ставится доллар, похожий на волну. Легко заметить, что соединить источник, кондуит и сток, можно разными способами. Все они равнозначны:

ghci> :t source $$ copyConduit =$ sink source $$ copyConduit =$ sink :: ResourceT IO () ghci> :t source $= copyConduit $$ sink source $= copyConduit $$ sink :: ResourceT IO () ghci> runResourceT $ source $= copyConduit $$ sink

Если бы кондуит был не копирующим, а, скажем, шифрующим, в первом случае мы бы получили сток, шифрующих входные данные, а во втором – источник зашифрованных данных. Соединяйте источники, кондуиты и стоки в порядке, который больше соответствует вашему пониманию решения задачи.

Кстати, а давайте напишем шифрующий кондуит:

ghci> :m + Data.Bits ghci> let cryptConduit = await >>= \x -> case x of Nothing -> ⏎ return (); Just j -> yield (pack $ Prelude.map (`xor` 42) $ ⏎ unpack j) :: Conduit ByteString (ResourceT IO) ByteString ghci> runResourceT $ source $= cryptConduit $$ sink

Правда круто?

Кондуиты в Scotty

Помните, как мы писали RESTful сервис на Scotty? В тот раз мы просто читали все данные из БД в память, после чего кодировали их в JSON и отдавали клиенту. А что, если бы у нас было 5 Гб данных? Или вообще бесконечный поток неких событий? Понятно, что в этом случае мы не можем класть данные в память. Тут-то на помощь и приходят кондуиты!

Рассмотрим пример:

{-# LANGUAGE OverloadedStrings #-} import Web.Scotty import Data.Conduit import Blaze.ByteString.Builder import qualified Data.ByteString.Lazy as LBS import Data.Aeson import Control.Monad import Control.Monad.Trans import Control.Concurrent (threadDelay) main = scotty 8080 $ do get "/api/v1.0/streams/simple" getSimpleStream get "/api/v1.0/streams/counter" getCounterStream getSimpleStream = do header "Content-type" "application/json" source simpleSource getCounterStream = do header "Content-type" "application/x-json-stream" source counterSource simpleSource :: Source (ResourceT IO) (Flush Builder) simpleSource = do sendJson [ "message" .= ("Hello!" :: String) ] counterSource :: Source (ResourceT IO) (Flush Builder) counterSource = do let num = 100 :: Int forM_ [1..num] $ \ctr -> do liftIO $ threadDelay 100000 sendJson [ "counter" .= ctr ] sendJson x = do sendLBS $ (encode $ object x) `LBS.append` "\n" yield Flush where sendLBS = yield . Chunk . fromLazyByteString

Для работы с кондуитами в Scotty есть функция source:

source :: Source (ResourceT IO) (Flush Builder) -> ActionM ()

Тип Flush предельно прост:

data Flush a = Chunk a | Flush

Благодаря ему источник может генерировать куски данных, обернутые в Chunk, а затем слать Flush, создавая эффект, подобный вызову сишной функции fflush. Эта возможность требуется во многих библиотеках, например, предназначенных для сжатия данных. В случае со Scotty в ответ на запрос посылается заголовок Transfer-Encoding: chunked, а очередной chunk посылается только после получения Flush.

Тип Builder предназначен для эффективного построения ByteString. Для создания билдера используются функции fromByteString и fromLazyByteString, а для создания ByteString’а из билдера – функции toByteString и toLazyByteString. Bulder является экземпляром класса Monoid, в связи с чем для объединения двух Builder’ов используется функция mappend, а для представления пустого билдера можно использовать функцию mempty. Вообще, идея та же, что и в каком-нибудь StringBuilder из мира Java.

В учетом вышесказанного, несложно разобраться, как работает функция sendJson. А все остальное нам уже знакомо.

Посмотрим на наш HTTP-сервер в действии:

$ curl localhost:8080/api/v1.0/streams/simple -N -D - HTTP/1.1 200 OK Server: Warp/1.3.9 Transfer-Encoding: chunked Content-type: application/json {"message":"Hello!"} $ curl localhost:8080/api/v1.0/streams/counter -N -D - HTTP/1.1 200 OK Server: Warp/1.3.9 Transfer-Encoding: chunked Content-type: application/x-json-stream {"counter":1} {"counter":2} {"counter":3} ( ... и так далее по одному сообщению раз в 100 мс ... ) {"counter":99} {"counter":100}

Теперь попробуем написать на кондуитах HTTP-клиента.

Пакет http-conduit

Начнем с простого клиента, который дергает конец streams/simple и сохраняет полученный ответ в файл simple.json:

import Network.HTTP.Conduit import Data.Conduit import Data.Conduit.Binary main = do manager <- newManager def req <- parseUrl "http://localhost:8080/api/v1.0/streams/simple" runResourceT $ do res <- http req manager responseBody res $$+- sinkFile "simple.json"

Для создания менеджера соединений используется функция newManager:

newManager :: ManagerSettings -> IO Manager

Тип ManagerSettings является экземпляром класса Data.Default, поэтому для получения настроек по умолчанию можно воспользоваться функцией def. Кстати, этот же прием удобно использовать для создания на Haskell функций как бы с необязательными именованными параметрами.

В несколько упрощенном виде тип функции http выглядит так:

http :: Request m -> Manager -> m (Response (ResumableSource m ByteString))

С первыми двумя параметрами все более-менее ясно. Тип Response представляет ответ сервера. Для извлечения тела ответа воспользуемся функцией responseBody:

responseBody :: Response body -> body

Очевидно, после этого мы получим тип ResumableSource m ByteString. Этот тип представляет источник, часть данных из которого можно передать в один сток, а оставшуюся часть – в другие стоки. Очень полезная возможность, если мы хотим, скажем, сохранить 20 Гб данных из сети в файловой системе FAT32. Для работы с ResumableSource предназначены следующие функции:

-- соединить источник со стоком и после -- закрытия стока получить ResumableSource ($$+) :: Monad m => Source m a -> Sink a m b -> m (ResumableSource m a, b) -- соединить ResumableSource со следующим стоком и получить -- новый ResumableSource ($$++) :: Monad m => ResumableSource m a -> Sink a m b -> m (ResumableSource m a, b) -- соединить ResumableSource с последним стоком ($$+-) :: Monad m => ResumableSource m a -> Sink a m b -> m b

Сейчас мы просто хотим сохранить тело ответа в файл, поэтому говорим:

responseBody res $$+- sinkFile "simple.json"

Рассмотрим чуть более сложный пример:

{-# LANGUAGE OverloadedStrings #-} import Data.Conduit import Network.HTTP.Conduit import qualified Data.Conduit.Binary as CB import qualified Data.ByteString.Char8 as BS import Control.Monad.Trans main = do manager <- newManager def req <- parseUrl "http://localhost:8080/api/v1.0/streams/counter" let headers = requestHeaders req req' = req { requestHeaders = ("Accept", "application/x-json-stream") : headers } runResourceT $ do res <- http req' manager responseBody res $$+- CB.lines =$ counterSink counterSink :: Sink BS.ByteString (ResourceT IO) () counterSink = do md <- await case md of Nothing -> return () Just d -> do liftIO $ BS.putStrLn "--------" liftIO $ BS.putStrLn d counterSink

Здесь для порядку мы передали правильный Accept-заголовок, а для обработки данных написали собственный сток. Как видите, тут используется тот же набор функций, что был применен при написании шифрующего кондуита. Все благодаря тому, что Source, Sink и Conduit – это все на самом деле ConduitM. Строго говоря, нет никакой гарантии, что один JSON-объект будет приходить в одном chunk’е, поэтому мы нарезаем HTTP-поток на строки с помощью кондуита lines из модуля Data.Conduit.Binary.

Заключение

Настоятельно рекомендую обратить внимание на модуль Data.Conduit.List, в нем содержится немало полезных функций. Все они довольно простые. Я уверен, вы сами во всем разберетесь.

Несмотря на то, что этот пост содержит довольно много букв, в действительности пользоваться кондуитами очень просто. Все-то нужно осилить пару типов и десяток функций для работы с ними. Один раз попробовав использовать кондуиты, вы перестанете понимать, как вообще без них можно писать что-то серьезное.

Ссылки по теме:

  • В пакете stm-conduit вы найдете функцию (>=<), предназначенную для объединения нескольких источников в один, функции sourceTMChan, sinkTMChan и tryReadTBMChan, с помощью которых можно «нарезать» источник на несколько источников, а также функцию buffer, позволяющую выполнять работу в источнике и стоке параллельно;
  • Кондуиты хорошо объяснены в книге Developing Web Applications with Haskell and Yesod все того же Michael Snoyman, в том числе там приводятся примеры использования пакета xml-conduit;

Все исходники к этой заметке вы найдете в этом архиве. Как обычно, для их сборки используйте команду cabal-dev install. А если вы уже обновились до cabal 1.18, то можете собрать код командой cabal sandbox init; cabal install без необходимости ставить cabal-dev. Подробности о сборке проектов на Haskell вы найдете в этой заметке.

Если вы честно прочитали этот пост от начала до конца и все поняли, то имеете полное право гордиться собой. Сходите в ближайшее кафе и закажите себе какую-нибудь вкусняшку. Вы это заслужили.