Памятка по функциям Cloud Haskell

16 января 2014

На данный момент мы уже посмотрели на Cloud Haskell в действии и выяснили, в чем заключаются отличия Cloud Haskell от Erlang. Сегодня же мы посмотрим на основные функции, предоставляемые Cloud Haskell. В том числе, на функции для работы с типизированными каналами, а также функции receiveTimeout, register, whereis, getProcessInfo, и другие. Про распределенщину пока не будет, потому что этот вопрос я и сам еще не до конца осилил :)

Создание новой ноды

Поскольку Cloud Haskell — совершенно обыкновенная библиотека, мы вынуждены создавать ноды и запускать процессы на них явным образом. К счастью, делается это в несколько строк кода.

newLocalNode :: Transport -> RemoteTable -> IO LocalNode

Данная функция создает новую локальную ноду. В качестве аргументов функция принимает транспорт и remote table, которые можно получить, воспользовавшись следующими функциями. Cloud Haskell, в отличие от Erlang, позволяет создавать несколько нод в рамках одного процесса операционной системы.

initRemoteTable :: RemoteTable

Создает новую remote table. Эта таблица нужна для того, чтобы физически разделенные ноды при получении сообщений от других нод могли правильно идентифицировать объекты в системе, такие, как типы и функции.

createTransport :: HostName -> ServiceName -> TCPParameters -> IO (Either IOException Transport)

Создает новый TCP-транспорт. Как уже отмечалось ранее, Cloud Haskell может использовать разные реализации транспорта. Тип Transport предоставляет общий интерфейс к транспорту, независимый от реализации. HostName и ServiceName — это просто строки, задающие хост и порт, например, "127.0.0.1" и "4444". Аргумент TCPParameters можно получить при помощи следующей функции.

defaultTCPParameters :: TCPParameters

Возвращает параметры TCP по умолчанию. При желании программист может эти параметры потюнить.

Создание процессов

Теперь, когда у нас есть нода, мы можем создать на ней первый процесс.

forkProcess :: LocalNode -> Process () -> IO ProcessId

Запускает процесс на заданной локальной ноде и возвращает его идентификатор ProcessId. Легковесные процессы в Cloud Haskell на самом деле представляют собой те же процессы, что создает функция forkIO. Как уже отмечалось ранее, код в процессах Cloud Haskell выполняется в монаде Process.

runProcess :: LocalNode -> Process () -> IO ()

Аналогично forkProcess, но ждет завершения созданного процесса.

spawnLocal :: Process () -> Process ProcessId

Процессы Cloud Haskell также могут создавать новые процессы на текущей ноде и получать их ProcessId. Помимо функции spawnLocal еще есть функция spawn, запускающая процесс на заданной ноде, а также spawnLink и spawnMonitor, которые делают то же самое, что spawn, но еще дополнительно создают линк или монитор (о которых речь пойдет ниже) на созданный процесс. Взаимодействие процессов на разных нодах в рамках этой заметки мы не рассматриваем.

spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a)

Создает новый типизированный канал, запускает новый процесс на локальной ноде, передав ему ReceivePort, и возвращает SendPort. Для запуска процессов на удаленных нодах есть аналогичная функция spawnChannel. Про класс типов Serializable и типизированные каналы речь пойдет ниже.

Обмен сообщениями

Традиционно процессы взаимодействуют путем обмена сообщений.

send :: Serializable a => ProcessId -> a -> Process ()

Кладет сообщение в очередь сообщений заданного процесса. Как уже отмечалось, чтобы тип стал экземпляром класса типов Serializable, достаточно сделать его экземпляром классов Typeable и Binary. Функция send никогда не блокирует вызывающий ее процесс и не бросает исключений, даже если процесса, указанного первым аргументом, не существует.

expect :: Serializable a => Process a

Извлекает из очереди сообщений сообщение заданного типа. Если в очереди такого сообщения нет, процесс блокируется до тех пор, пока оно не поступит. Если в очереди более одного такого сообщения, они извлекаются в порядке FIFO.

expectTimeout :: Serializable a => Int -> Process (Maybe a)

Аналогично expect, но с заданным таймаутом. Таймаут задается в микросекундах (то есть, 10-6, у Марлоу в книге написано «милли», это неверно!). Если указан нулевой таймаут, получаем неблокируемую проверку очереди.

Типизированные каналы

Помимо обмена сообщениями для взаимодействия процессов Cloud Haskell также предоставляет типизированные каналы.

newChan :: Serializable a => Process (SendPort a, ReceivePort a)

Создает новый типизированный однонаправленный канал. Через SendPort пишем, через ReceivePort читаем. Обратите внимание, что SendPort сериализуем, а ReceivePort — нет. Это защищает нас от передачи ReceivePort на другие ноды. Функции linkPort и monitorPort, речь о которых пойдет ниже, работают только с SendPort.

sendChan :: Serializable a => SendPort a -> a -> Process ()

Пишем в канал, аналогично send.

receiveChan :: Serializable a => ReceivePort a -> Process a

Читаем из канала, аналогично expect.

receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)

Читаем из канала с заданным таймаутом, аналогично expectTimeout.

mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)

Объединяет заданные принимающие порты в один порт. При чтении из такого объединенного порта порты из переданного списка проверяются слева направо и возвращается первое найденное сообщение. Обратите внимание, что после объединения оригинальными портами можно пользоваться, как и раньше. То есть, одновременно читать из оригинальных и объединенных портов не запрещается. Кроме того, можно спокойно объединять пересекающиеся множества портов.

mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)

Аналогично mergePortsBiased, только чтение производится round-robin’ом (отсюда и RR). То есть, порты проверяются слева направо, но после каждого получения сообщения список портов сдвигается на один элемент, в результате чего порт, который был в начале списка, оказывается в его конце.

Сопоставление сообщений

На случай, если вы хотите выбрать из очереди сообщений или каналов сообщение одного из заданных типов, Cloud Haskell предоставляет следующие функции.

match :: Serializable a => (a -> Process b) -> Match b

Принимает функцию, обрабатывающую сообщение типа a и возвращающую тип b, возвращает Match b. Для чего нужен этот Match b, мы увидим чуть ниже.

matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b

Аналогично match, но сообщение также должно удовлетворять предикату, переданному первым аргументом.

matchUnknown :: Process b -> Match b

Аналогично match, но нам все равно, какого типа сообщение в очереди.

matchChan :: ReceivePort a -> (a -> Process b) -> Match b

Аналогично match, только для каналов.

receiveWait :: [Match b] -> Process b

А теперь, собственно, ради чего все это затевалось. Принимает список Match b, полученных от функций, описанных выше. Идет по списку и проверят, нет ли в очереди сообщений или канале сообщения, соответствующего заданному элементу списка. Если есть, извлекает сообщение из очереди или канала, выполняет сопоставленное действие и возвращает результат. Другими словами, мы имеем DSL для паттерн матчинга сообщений в мейлбоксе и каналах. Может использоваться например так: receiveWait [match someAction, match someOtherAction].

receiveTimeout :: Int -> [Match b] -> Process (Maybe b)

Аналогично receiveWait, только с заданным таймаутом. Семантика такая же, как и у expectTimeout. На самом деле, expectTimeout реализован через receiveTimeout.

Завершение процессов

Если вам требуется преждевременно завершить процесс, воспользуйтесь одной из следующих функций.

exit :: Serializable a => ProcessId -> a -> Process ()

Вежливый способ завершить заданный процесс. Посылает исключение ProcessExitException с закодированной причиной завершения процесса. Это исключение может быть поймано с помощью функции catchExit и других.

catchExit :: Serializable a => Process b -> (ProcessId -> a -> Process b) -> Process b

Если во время выполнения заданных действий (первый аргумент) возникнет исключение ProcessExitException, вместо завершения процесса будет выполнен заданный хэндлер (второй аргумент). Тип хэндлера должен соответствовать типу причины завершения процесса, иначе исключение не будет поймано.

kill :: ProcessId -> String -> Process ()

Грубый способ завершения заданного процесса. Процесс завершится, если только не прибегнуть к каким-нибудь грязным хакам с перехватом асинхронных исключений.

die :: Serializable a => a -> Process b

Как exit, только примененный к текущему процессу.

terminate :: Process a

Немедленное завершение текущего процесса. Реализовано, как бросание исключения ProcessTerminationException.

Линки и мониторы

В отличие от Erlang, линки и мониторы в Cloud Haskell однонаправленные. Если в случае смерти некого процесса вы хотите просто завершить текущий процесс, используйте линки. Если же вы хотите совершить какие-то более интересные действия, используйте мониторы.

link :: ProcessId -> Process ()

Текущий процесс линкуется к заданному процессу. Вызов этой функции никогда не блокирует выполнение текущего процесса. Когда процесс А прилинкован к процессу Б (то есть, А сказал link pidB), процессу А посылается асинхронное исключение, если (1) процесс Б завершился, нормально или нет, или (2) связь между нодами, на которых работают А и Б, пропала. Линки уникальны. Если попытаться создать еще один линк от А к Б, ничего не произойдет. Если в момент вызова link процессом А процесс Б уже завершился, процесс А также завершится.

unlink :: ProcessId -> Process ()

Удаляет линк к заданному процессу. Функция синхронна, то есть, гарантируется, что при завершении функции unlink текущий процесс не получит исключения, если заданный процесс умрет. Но при этом функция не ожидает ответа от удаленных нод, в этом смысле она асинхронна.

monitor :: ProcessId -> Process MonitorRef

Устанавливает монитор на заданный процесс и возвращает соответствующий MonitorRef. Вызов этой функции никогда не блокирует выполнение текущего процесса. В случае завершения указанного процесса, нормального или нет, или в случае иcчезновения связи между нодами, текущий процесс получит в свой мейлбокс сообщение ProcessMonitorNotification:

data ProcessMonitorNotification
  = ProcessMonitorNotification !MonitorRef !ProcessId !DiedReason

data DiedReason
  = DiedNormal
  | DiedException !String
  | DiedDisconnect
  | DiedNodeDown
  | DiedUnknownId

В отличие от линков, мониторы не уникальны. Процесс А может создать много мониторов на процесс Б. Соответственно, при завершении процесса Б или исчезновении связи с нодой, на которой он работает, процесс А получит столько сообщений, сколько создано мониторов. В случае, если в момент вызова monitor целевой процесс уже завершился, все отработает в соответствии с ожиданиями, как и в случае с link. На самом деле, линки в Cloud Haskell — это частный случай мониторов.

unmonitor :: MonitorRef -> Process ()

Удаляет заданный монитор. C синхронно-асинхронным дуализмом дела обстоят так же, как и в случае с unlink.

withMonitor :: ProcessId -> Process a -> Process a

Создает монитор на заданный процесс на время выполнения заданного действия. Не забывает сделать unmonitor в случае возникновения исключения. Реализован с помощью bracket.

Еще есть функции (un)link[Node|Port] и (de)monitor[Node|Port] с очевидными типами и назначением.

Именованные процессы

Cloud Haskell умеет регистрировать процессы под заданными именами и искать процессы в реестре по имени.

register :: String -> ProcessId -> Process ()

Регистрирует процесс под указанным именем в локальном реестре. Сам процесс при этом может работать на другой ноде. Указанное имя не должно быть занято. В случае ошибки бросается исключение ProcessRegistrationException.

reregister :: String -> ProcessId -> Process ()

Аналогично register, но заменяет существующую регистрацию. Указанное имя уже должно быть зарегистрировано.

unregister :: String -> Process ()

Удаляет процесс из локального реестра. Имя уже должно быть зарегистрировано.

whereis :: String -> Process (Maybe ProcessId)

Ищет в локальном реестре процесс с заданным именем.

nsend :: Serializable a => String -> a -> Process ()

Находит в локальном реестре процесс с заданным именем и посылает ему сообщение.

Если вас интересует работа с удаленными реестрами, посмотрите функции nsendRemote, whereisRemoteAsync и (un|re)registerRemoteAsync.

Прочее

Также мне хотелось бы рассказать о следующих функциях, но они не подошли ни под один из предыдущих разделов.

getSelfPid :: Process ProcessId

Возвращает процессу его ProcessId. Когда процессы обмениваются сообщениями, в сообщениях обычно указывается «адрес отправителя».

getSelfNode :: Process NodeId

Кроме того, ничто не помешает процессу получить NodeId текущей ноды.

say :: String -> Process ()

Посылает сообщение с заданной строкой, а также ProcessId текущего процесса и временем, процессу, зарегистрированному под именем logger. По умолчанию этот процесс пишет принимаемые данные в stderr, но вообще это зависит от реализации бэкенда Cloud Haskell. Используйте say в сочетании с функцией printf для записи отладочных сообщений.

getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)

Получение информации о заданном процессе:

data ProcessInfo
  = ProcessInfo {infoNode :: NodeId,
                 infoRegisteredNames :: [String],
                 infoMessageQueueLength :: Maybe Int,
                 infoMonitors :: [(ProcessId, MonitorRef)],
                 infoLinks :: [ProcessId]}

Ух, немало получилось, правда? А ведь это мы еще не смотрели на всякую распределенщину, местные футуры и местный OTP! На самом деле, если так подумать, все написанное выше довольно просто, логично и легко запоминается. Дополнительную информацию вы можете найти в документации к Cloud Haskell на Hackage.

Дополнение: Cloud Haskell — резюме и подборка ссылок по теме

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.