Программная транзакционная память в Haskell
8 июля 2013
Программная транзакционная память (software transactional memory, STM) — это механизм взаимодействия между потоками, имеющий ряд существенных преимуществ перед традиционным подходом с использованием блокировок. Благодаря этой заметке вы узнаете, как работать с STM в Haskell.
Примечание: Прежде, чем продолжать чтение, ознакомьтесь с заметкой Работа с нитями/потоками в Haskell.
Работа с транзакционной памятью похожа на работу с транзакциями в базах данных. Доступ к разделяемым данным осуществляется внутри транзакций. Находясь внутри транзакции, поток не видит изменений, производимых другими потоками. Сохранение совершенных изменений происходит при выходе из транзакции. Если в этот момент выясняется, что данные были изменены другим потоком, транзакция начинает выполняться сначала. Если внутри транзакции будет брошено исключение, данные не будут изменены. STM дает нам атомарность (будет изменено все или ничего), согласованность (данные согласованы до начала транзакции и остаются таковыми после ее завершения) и изолированность (не видно изменений, производимых другими потоками).
Другими словами, мы получаем буквы A, C и I из ACID. Также транзакционная память интересна тем, что, в отличие от традиционных мьютексов и семафоров, она обеспечивает отсутствие дэдлоков в наших программах.
Есть и несколько недостатков. Во-первых, при выполнении длительных транзакций увеличивается вероятность того, что во время сохранения данных они окажутся уже измененными другим потоком и транзакцию придется выполнить повторно. Это может привести к существенному замедлению программы. Для борьбы с этой проблемой рекомендуется писать как можно более короткие транзакции. Во-вторых, в силу самой семантики STM, внутри транзакций не стоит делать что-либо с файлами или сетью. К счастью, Haskell делает невозможным ввод/вывод внутри транзакций, ровно как и работу с разделяемыми данными вне транзакции.
Основные функции для работы с STM следующие:
Собственно, выполнение транзакции. Как видите, работа с транзакционной памятью оборачиваются в монаду STM аналогично тому, как взаимодействие с окружающим миром оборачивается в монаду IO.
newTVarIO :: a -> IO (TVar a)
Создание нового TVar внутри транзакции или монады IO. TVars — это такие же контейнеры для разделяемых данных, как и уже знакомые нам MVars, только TVars не могут быть пустыми.
writeTVar :: TVar a -> a -> STM ()
Чтение и запись из/в TVar внутри транзакции.
modifyTVar' :: TVar a -> (a -> a) -> STM ()
swapTVar :: TVar a -> a -> STM a
Несколько вспомогательных функций с очевидной семантикой и реализацией. Отличие функции modifyTVar'
от modifyTVar
состоит в том, что она является строгой, а не ленивой.
Начинает выполнение транзакции сначала. Эта функция удобна в случае, если во время выполнения транзакции по каким-то признакам стало понятно, что она все равно не будет выполнена удачно. В GHC эта функция обладает одним важным свойством. Она блокирует текущий поток до тех пор, пока не произойдет изменение одной или нескольких переменных, к которым мы обращались до вызова retry
. Действительно, непросто прийти к другому результату, если ничего не изменилось.
О семантики функции нетрудно догадаться по ее названию. Вместо повторного выполнения первой транзакции будет выполнена вторая. Если вторая транзакция также не будет выполнена успешно с первой попытки, вся конструкция целиком будет выполнена повторно.
Помимо TVar пакет stm предлагает нам и другие контейнеры, включая каналы (TChan), очереди (TQueue), очереди ограниченного размера (TBQueue) и массивы (TArray). В пакете stm-chans можно найти дополнительные контейнеры, в том числе канал ограниченного размера (TBChan).
Рассмотрим простейший пример использования STM:
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
threadLoop :: Int -> TVar Int -> TVar [(Int, Int)] -> IO ()
threadLoop num counter printQueue = do
cnt <- atomically $ do
t <- readTVar counter
let t' = t - 1
writeTVar counter t'
return t'
unless (cnt < 0) $ do
atomically $ modifyTVar' printQueue (\ q -> (num, cnt) : q)
threadLoop num counter printQueue
printLoop :: TVar [(Int,Int)] -> IO ()
printLoop printQueue = do
q <- atomically $ swapTVar printQueue []
case q of
[] ->
printLoop printQueue
_ -> do
q `forM_` \t -> print t
unless (any (\(_, x) -> x == 0) q) $ printLoop printQueue
main :: IO ()
main = do
counter <- newTVarIO 100
printQueue <- newTVarIO []
[1..10] `forM_` \num -> forkIO $ threadLoop num counter printQueue
printLoop printQueue
Десять потоков одновременно работают с одним счетчиком. Классика жанра. Надеюсь, что тут все понятно. Перейдем к следующему примеру. Попробуем переписать с использованием STM нашу программу для взлома SHA256:
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import Control.DeepSeq
workerLoop :: TQueue String -> TQueue [String] -> String -> Int ->
[String] -> IO ()
workerLoop taskQueue resultQueue charList pwLen hashList = do
maybeTask <- atomically $ tryReadTQueue taskQueue
case maybeTask of
Nothing -> return ()
Just task -> do
let postfixList = passwordList charList $ pwLen - length task
pwList = map (task ++) postfixList
pwHashList = [(pw, hash pw) | pw <- pwList]
rslt = [pw ++ ":" ++ h | (pw,h) <- pwHashList,
h `elem` hashList]
-- как и в прошлый раз, мы должны принудительно вычислить rslt
-- иначе от многопоточности не будет никакой пользы
rslt `deepseq` atomically $ writeTQueue resultQueue rslt
workerLoop taskQueue resultQueue charList pwLen hashList
mainLoop :: TQueue [String] -> Int -> IO ()
mainLoop _ 0 = return ()
mainLoop resultQueue taskNumber = do
res <- atomically $ readTQueue resultQueue
mapM_ putStrLn res
mainLoop resultQueue (taskNumber - 1)
main :: IO ()
main = do
let hashList = [
-- 1234
"03ac674216f3e15c761ee1a5e255f067" ++
"953623c8b388b4459e13f978d7c846f4",
-- r2d2
"8adce0a3431e8b11ef69e7f7765021d3" ++
"ee0b70fff58e0480cadb4c468d78105f"
]
pwLen = 4
chunkLen = 2
charList = ['0'..'9'] ++ ['a'..'z']
taskList = passwordList charList chunkLen
taskNumber = length taskList
workerNumber <- getNumCapabilities
taskQueue <- newTQueueIO
resultQueue <- newTQueueIO
atomically $ taskList `forM_` writeTQueue taskQueue
workerNumber `replicateM_` forkIO (workerLoop taskQueue resultQueue
charList pwLen hashList)
mainLoop resultQueue taskNumber
Здесь как нельзя кстати пришелся контейнер TQueue. Интересно отметить, что новая версия программы стала быстрее. В один поток она справляется с задачей за 13.5 сек (против 14.3 сек в старой версии), в четыре потока — за 3.5 сек (против 3.8 сек), а в восемь — за 3.2 сек (против 3.4 сек). Программа стала на 6-8% быстрее, мы имеем куда более богатый выбор контейнеров и функций для работы с ними, а также гарантию постоянной согласованности наших данных и отсутствия дэдлоков. Все эти плюшки были получены без каких-либо усилий с нашей стороны. Крутяк, я считаю.
В действительности, про отсутствие дэдлоков я немного приврал. Насколько я понимаю, дэдлоки возможны в любой многопоточной программе, даже при использовании STM или модели акторов. Дэдлок может возникнуть не только из-за неправильного порядка работы с мьютексами, но и вследствие различного рода логических ошибок. Например, в Erlang дэдлок возникает, когда процесс A шлет запрос процессу B и ждет от него ответа, однако процесс B, получив запрос от процесса А, в свою очередь также шлет запрос процессу А. В результате оба процесса находятся в ожидании ответа, который никогда не придет. Аналогичная ситуация может возникнуть в Haskell при использовании STM и TQueue.
Как и в случае с MVars, при использовании STM в некоторых случаях GHC умеет обнаруживать дэдлоки во время выполнения программы и посылать заблокированным потокам асинхронное исключение BlockedIndefinitelyOnSTM.
Ссылки по теме:
- Глава Software transactional memory в «Real World Haskell»;
- Туториал по STM от Саймона Пейтона Джонса;
- Some examples of Software Transactional Memory in Haskell;
- Статья Composable Memory Transactions (PDF) помимо прочего интересна тем, что авторы делают акцент на облегчении компоновки различных программных компонентов в случае использования STM;
Исходники к заметке вы найдете в этом архиве (файлы StmTest.hs и MainStm.hs). О том, как они собираются, читайте здесь.
Дополнение: Параллелизм в Haskell (гостевой пост Романа Соколовского)
Метки: Haskell, Параллелизм и многопоточность, Функциональное программирование.
Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.