Программная транзакционная память в Haskell

8 июля 2013

Программная транзакционная память (software transactional memory, STM) — это механизм взаимодействия между потоками, имеющий ряд существенных преимуществ перед традиционным подходом с использованием блокировок. Благодаря этой заметке вы узнаете, как работать с STM в Haskell.

Примечание: Прежде, чем продолжать чтение, ознакомьтесь с заметкой Работа с нитями/потоками в Haskell.

Работа с транзакционной памятью похожа на работу с транзакциями в базах данных. Доступ к разделяемым данным осуществляется внутри транзакций. Находясь внутри транзакции, поток не видит изменений, производимых другими потоками. Сохранение совершенных изменений происходит при выходе из транзакции. Если в этот момент выясняется, что данные были изменены другим потоком, транзакция начинает выполняться сначала. Если внутри транзакции будет брошено исключение, данные не будут изменены. STM дает нам атомарность (будет изменено все или ничего), согласованность (данные согласованы до начала транзакции и остаются таковыми после ее завершения) и изолированность (не видно изменений, производимых другими потоками).

Другими словами, мы получаем буквы A, C и I из ACID. Также транзакционная память интересна тем, что, в отличие от традиционных мьютексов и семафоров, она обеспечивает отсутствие дэдлоков в наших программах.

Есть и несколько недостатков. Во-первых, при выполнении длительных транзакций увеличивается вероятность того, что во время сохранения данных они окажутся уже измененными другим потоком и транзакцию придется выполнить повторно. Это может привести к существенному замедлению программы. Для борьбы с этой проблемой рекомендуется писать как можно более короткие транзакции. Во-вторых, в силу самой семантики STM, внутри транзакций не стоит делать что-либо с файлами или сетью. К счастью, Haskell делает невозможным ввод/вывод внутри транзакций, ровно как и работу с разделяемыми данными вне транзакции.

Основные функции для работы с STM следующие:

atomically :: STM a -> IO a

Собственно, выполнение транзакции. Как видите, работа с транзакционной памятью оборачиваются в монаду STM аналогично тому, как взаимодействие с окружающим миром оборачивается в монаду IO.

newTVar :: a -> STM (TVar a)
newTVarIO :: a -> IO (TVar a)

Создание нового TVar внутри транзакции или монады IO. TVars — это такие же контейнеры для разделяемых данных, как и уже знакомые нам MVars, только TVars не могут быть пустыми.

readTVar :: TVar a -> STM a
writeTVar :: TVar a -> a -> STM ()

Чтение и запись из/в TVar внутри транзакции.

modifyTVar :: TVar a -> (a -> a) -> STM ()
modifyTVar' :: TVar a -> (a -> a) -> STM ()
swapTVar :: TVar a -> a -> STM a

Несколько вспомогательных функций с очевидной семантикой и реализацией. Отличие функции modifyTVar' от modifyTVar состоит в том, что она является строгой, а не ленивой.

retry :: STM a

Начинает выполнение транзакции сначала. Эта функция удобна в случае, если во время выполнения транзакции по каким-то признакам стало понятно, что она все равно не будет выполнена удачно. В GHC эта функция обладает одним важным свойством. Она блокирует текущий поток до тех пор, пока не произойдет изменение одной или нескольких переменных, к которым мы обращались до вызова retry. Действительно, непросто прийти к другому результату, если ничего не изменилось.

orElse :: STM a -> STM a -> STM a

О семантики функции нетрудно догадаться по ее названию. Вместо повторного выполнения первой транзакции будет выполнена вторая. Если вторая транзакция также не будет выполнена успешно с первой попытки, вся конструкция целиком будет выполнена повторно.

Помимо TVar пакет stm предлагает нам и другие контейнеры, включая каналы (TChan), очереди (TQueue), очереди ограниченного размера (TBQueue) и массивы (TArray). В пакете stm-chans можно найти дополнительные контейнеры, в том числе канал ограниченного размера (TBChan).

Рассмотрим простейший пример использования STM:

module Main where

import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM

threadLoop :: Int -> TVar Int -> TVar [(Int, Int)] -> IO ()
threadLoop num counter printQueue = do
  cnt <- atomically $ do
           t <- readTVar counter
           let t' = t - 1
           writeTVar counter t'
           return t'
  unless (cnt < 0) $ do
    atomically $ modifyTVar' printQueue (\ q -> (num, cnt) : q)
    threadLoop num counter printQueue

printLoop :: TVar [(Int,Int)] -> IO ()
printLoop printQueue = do
  q <- atomically $ swapTVar printQueue []
  case q of
    [] ->
      printLoop printQueue
    _ -> do
      q `forM_` \t -> print t
      unless (any (\(_, x) -> x == 0) q) $ printLoop printQueue

main :: IO ()
main = do
  counter <- newTVarIO 100
  printQueue <- newTVarIO []
  [1..10] `forM_` \num -> forkIO $ threadLoop num counter printQueue
  printLoop printQueue

Десять потоков одновременно работают с одним счетчиком. Классика жанра. Надеюсь, что тут все понятно. Перейдем к следующему примеру. Попробуем переписать с использованием STM нашу программу для взлома SHA256:

import Bruteforce.Common
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import Control.DeepSeq

workerLoop :: TQueue String -> TQueue [String] -> String -> Int ->
                [String] -> IO ()
workerLoop taskQueue resultQueue charList pwLen hashList = do
  maybeTask <- atomically $ tryReadTQueue taskQueue
  case maybeTask of
    Nothing -> return ()
    Just task -> do
      let postfixList = passwordList charList $ pwLen - length task
          pwList = map (task ++) postfixList
          pwHashList = [(pw, hash pw) | pw <- pwList]
          rslt = [pw ++ ":" ++ h | (pw,h) <- pwHashList,
                                    h `elem` hashList]
      -- как и в прошлый раз, мы должны принудительно вычислить rslt
      -- иначе от многопоточности не будет никакой пользы
      rslt `deepseq` atomically $ writeTQueue resultQueue rslt
      workerLoop taskQueue resultQueue charList pwLen hashList

mainLoop :: TQueue [String] -> Int -> IO ()
mainLoop _ 0 = return ()
mainLoop resultQueue taskNumber = do
  res <- atomically $ readTQueue resultQueue
  mapM_ putStrLn res
  mainLoop resultQueue (taskNumber - 1)

main :: IO ()
main = do
  let hashList = [
        -- 1234
        "03ac674216f3e15c761ee1a5e255f067" ++
        "953623c8b388b4459e13f978d7c846f4",
        -- r2d2
        "8adce0a3431e8b11ef69e7f7765021d3" ++
        "ee0b70fff58e0480cadb4c468d78105f"
        ]
      pwLen = 4
      chunkLen = 2
      charList = ['0'..'9'] ++ ['a'..'z']
      taskList = passwordList charList chunkLen
      taskNumber = length taskList
  workerNumber <- getNumCapabilities
  taskQueue <- newTQueueIO
  resultQueue <- newTQueueIO
  atomically $ taskList `forM_` writeTQueue taskQueue
  workerNumber `replicateM_` forkIO (workerLoop taskQueue resultQueue
                                             charList pwLen hashList)
  mainLoop resultQueue taskNumber

Здесь как нельзя кстати пришелся контейнер TQueue. Интересно отметить, что новая версия программы стала быстрее. В один поток она справляется с задачей за 13.5 сек (против 14.3 сек в старой версии), в четыре потока — за 3.5 сек (против 3.8 сек), а в восемь — за 3.2 сек (против 3.4 сек). Программа стала на 6-8% быстрее, мы имеем куда более богатый выбор контейнеров и функций для работы с ними, а также гарантию постоянной согласованности наших данных и отсутствия дэдлоков. Все эти плюшки были получены без каких-либо усилий с нашей стороны. Крутяк, я считаю.

В действительности, про отсутствие дэдлоков я немного приврал. Насколько я понимаю, дэдлоки возможны в любой многопоточной программе, даже при использовании STM или модели акторов. Дэдлок может возникнуть не только из-за неправильного порядка работы с мьютексами, но и вследствие различного рода логических ошибок. Например, в Erlang дэдлок возникает, когда процесс A шлет запрос процессу B и ждет от него ответа, однако процесс B, получив запрос от процесса А, в свою очередь также шлет запрос процессу А. В результате оба процесса находятся в ожидании ответа, который никогда не придет. Аналогичная ситуация может возникнуть в Haskell при использовании STM и TQueue.

Как и в случае с MVars, при использовании STM в некоторых случаях GHC умеет обнаруживать дэдлоки во время выполнения программы и посылать заблокированным потокам асинхронное исключение BlockedIndefinitelyOnSTM.

Ссылки по теме:

Исходники к заметке вы найдете в этом архиве (файлы StmTest.hs и MainStm.hs). О том, как они собираются, читайте здесь.

Дополнение: Параллелизм в Haskell (гостевой пост Романа Соколовского)

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.