Программная транзакционная память в Scala
10 июня 2015
Вот чем мне нравится язык Scala, это тем, что, в отличие от ряда других языков, он не ограничивает выбор программиста. Если нужны мьютексы, используем мьютексы. Хотим атомарные переменные — пожалуйста. Нужны акторы — да вот же они. Агенты, параллельные коллекции — все к вашим услугам, хоть на volatile’ах многопоточность стройте. Ну и, конечно же, в Scala есть STM, которая, как и многие другие удачные решения, были перенесены в язык из мира Haskell.
Поскольку мы уже работали с STM в Haskell, не будем подробно останавливаться на том, что это такое и зачем нужно. Если вы не читали соответствующую заметку, рекомендую ознакомиться хотя бы с первыми тремя-четырьмя ее абзацами. Прочим же напоминаю главное, что нужно помнить об STM. STM дает нам атомарность изменений, вносимых в изменяемое состояние, согласованность данных в любой момент времени, а также изолированность от транзакций, выполняемых параллельно. Работая с STM очень трудно сделать дэдлок (но, как мы уже выясняли, в целом возможно). Если внутри транзакции будет брошено исключение, данные останутся в согласованном состоянии. Кроме того, функции, использующие STM, хорошо компонуются друг с другом.
Все это добро есть в библиотеке ScalaSTM. Пользоваться ей очень просто. Не будем тянуть кота за все его подробности, лучше сразу рассмотрим классический пример со счетами и деньгами на них:
import scala.annotation.tailrec
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.stm._
import scala.util._
import scala.concurrent.ExecutionContext.Implicits.global
object AccountStates {
val accountsNum = 10
var states: Map[String, Ref[Int]] = {
for (i <- 0 until accountsNum)
yield s"account$i" -> Ref(1000)
}.toMap
def randomAccount: String = s"account${Random.nextInt(accountsNum)}"
def transfer(fromAccount: String, toAccount: String, amount: Int)
(implicit txn: InTxn) {
states(fromAccount)() = states(fromAccount)() - amount
states(toAccount)() = states(toAccount)() + amount
}
@tailrec
def runTransactions(threadNumber: Int, transactionNumber: Int) {
if(transactionNumber <= 0) return
val fromAccount = randomAccount
var toAccount = ""
do toAccount = randomAccount while(fromAccount == toAccount)
atomic { implicit txn =>
val amount = Random.nextInt(states(fromAccount).get)
println(
s"[$threadNumber-$transactionNumber] Transferring $amount " +
s"USD from $fromAccount to $toAccount...")
transfer(fromAccount, toAccount, amount)
}
runTransactions(threadNumber, transactionNumber - 1)
}
}
object StmExample extends App {
val futuresSeq = {
for(thrNum <- 1 to 10)
yield {
val f = Future { AccountStates.runTransactions(thrNum, 100) }
f onFailure { case e => println(s"Future $thrNum failed: $e") }
f
}
}
val fResults = Future.sequence(futuresSeq)
Await.ready(fResults, Duration.Inf)
println("FINAL STATE:")
val finalState = {
atomic { implicit txn =>
for((acc, balance) <- AccountStates.states)
yield acc -> balance()
}
}
for((acc, balance) <- finalState) {
println(s"$acc -> $balance")
}
println(s"TOTAL: ${finalState.values.sum}")
}
В начале мы имеем 10 счетов, на каждом из которых лежит по 1000$. Заметьте, что состоянием счета является не Int, а Ref[Int]. Ref — это такой специальный контейнер, работать с которым можно только внутри транзакции (аналог TVar в Haskell). Сама же транзакция создается с помощью функции atomic. Обратите внимание, как то, что в Haskell решается с помощью монад, в Scala достигается при помощи более понятных и легко компонуемых неявных аргументов. Внутри транзакции c Ref’ами можно работать при помощи x.get()
и x.set(...)
, либо при помощи более лаконичной записи x()
и x() = ...
, которая делает то же самое. В данном примере создается 10 футур, которые случайным образом бросают деньги со счета на счет таким образом, чтобы счета не уходили в минус. Специально для демонстрации компонуемости кусков кода, работающих с STM, была выделена функция transfer. Перед завершением работы программа выводит состояние всех счетов и суммарное количество денег на них:
account2 -> 27
account6 -> 691
account9 -> 1773
account1 -> 422
account5 -> 1316
account0 -> 671
account8 -> 3481
account4 -> 1272
account7 -> 167
account3 -> 180
TOTAL: 10000
Как видите, денег в системе не прибавилось и не убавилось, что есть именно то, чего мы и ожидали. Обратите внимание, с какой легкостью мы оперируем счетами! Представьте, если бы мы пытались решить ту же самую задачу при помощи акторов, каких-нибудь агентов или мьютексов. Это был бы просто кошмар! Ну или все потоки упирались бы в одно и то же место.
В приведенном примере внутри транзакции использовались некоторые функции с побочными эффектами — nextInt и println. В этом нет ничего плохого, но в общем случае вы не должны так делать, так как код внутри блока atomic вообще может повторяться, а вы вряд ли хотите, чтобы ваша система многократно отправляла письма или типа того. Можете провести эксперимент, изменив код примерно таким образом:
atomic { implicit txn =>
attempt = attempt + 1
if(attempt > 1) {
println(s"[$threadNumber] Retried, attempt = $attempt")
}
// ...
}
Вы заметите, что большинство транзакций выполняются успешно с первой попытки, но в редких случаях транзакция начинает выполняться сначала, возможно, даже не один раз. Такое происходит в случае, если во время выполнения транзакции использованное состояние было изменено другими потоками. Также программист может сам перезапустить транзакцию с помощью функции retry:
// ...
if(someConditionWasViolated(...)) retry
// ...
}
Следует однако отметить, что при вызове retry транзакция будет выполнена повторно только после изменении Ref’ов, прочитанных перед вызовом retry. Действительно, было бы странно повторять транзакцию вновь и вновь при условии, что ничего не изменилось. Если же перед вызовом retry никакие Ref’ы не читались, вы получите такое исключение:
because cumulative read set is empty
За кадром остались транзакционные коллекции вроде TMap и TSet, комбинаторы типа orAtomic, и не только. Но я уверен, что теперь, ознакомившись с основами использования STM в Scala, вы без труда разберетесь в этих и других вопросах самостоятельно.
Ссылки по теме:
А используете ли вы ScalaSTM в своих проектах?
Метки: Scala, Параллелизм и многопоточность, Функциональное программирование.
Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.