Программная транзакционная память в Scala

10 июня 2015

Вот чем мне нравится язык Scala, это тем, что, в отличие от ряда других языков, он не ограничивает выбор программиста. Если нужны мьютексы, используем мьютексы. Хотим атомарные переменные — пожалуйста. Нужны акторы — да вот же они. Агенты, параллельные коллекции — все к вашим услугам, хоть на volatile’ах многопоточность стройте. Ну и, конечно же, в Scala есть STM, которая, как и многие другие удачные решения, были перенесены в язык из мира Haskell.

Поскольку мы уже работали с STM в Haskell, не будем подробно останавливаться на том, что это такое и зачем нужно. Если вы не читали соответствующую заметку, рекомендую ознакомиться хотя бы с первыми тремя-четырьмя ее абзацами. Прочим же напоминаю главное, что нужно помнить об STM. STM дает нам атомарность изменений, вносимых в изменяемое состояние, согласованность данных в любой момент времени, а также изолированность от транзакций, выполняемых параллельно. Работая с STM очень трудно сделать дэдлок (но, как мы уже выясняли, в целом возможно). Если внутри транзакции будет брошено исключение, данные останутся в согласованном состоянии. Кроме того, функции, использующие STM, хорошо компонуются друг с другом.

Все это добро есть в библиотеке ScalaSTM. Пользоваться ей очень просто. Не будем тянуть кота за все его подробности, лучше сразу рассмотрим классический пример со счетами и деньгами на них:

package me.eax.stm_example

import scala.annotation.tailrec
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.stm._
import scala.util._
import scala.concurrent.ExecutionContext.Implicits.global

object AccountStates {
  val accountsNum = 10
  var states: Map[String, Ref[Int]] = {
      for (i <- 0 until accountsNum)
      yield s"account$i" -> Ref(1000)
    }.toMap

  def randomAccount: String = s"account${Random.nextInt(accountsNum)}"

  def transfer(fromAccount: String, toAccount: String, amount: Int)
              (implicit txn: InTxn) {
    states(fromAccount)() = states(fromAccount)() - amount
    states(toAccount)() = states(toAccount)() + amount
  }

  @tailrec
  def runTransactions(threadNumber: Int, transactionNumber: Int) {
    if(transactionNumber <= 0) return
    val fromAccount = randomAccount
    var toAccount = ""
    do toAccount = randomAccount while(fromAccount == toAccount)
    atomic { implicit txn =>
      val amount = Random.nextInt(states(fromAccount).get)
      println(
        s"[$threadNumber-$transactionNumber] Transferring $amount " +
        s"USD from $fromAccount to $toAccount...")
      transfer(fromAccount, toAccount, amount)
    }
    runTransactions(threadNumber, transactionNumber - 1)
  }
}

object StmExample extends App {
  val futuresSeq = {
    for(thrNum <- 1 to 10)
    yield {
      val f = Future { AccountStates.runTransactions(thrNum, 100) }
      f onFailure { case e => println(s"Future $thrNum failed: $e") }
      f
    }
  }

  val fResults = Future.sequence(futuresSeq)
  Await.ready(fResults, Duration.Inf)

  println("FINAL STATE:")
  val finalState = {
    atomic { implicit txn =>
      for((acc, balance) <- AccountStates.states)
      yield acc -> balance()
    }
  }

  for((acc, balance) <- finalState) {
    println(s"$acc -> $balance")
  }

  println(s"TOTAL: ${finalState.values.sum}")
}

В начале мы имеем 10 счетов, на каждом из которых лежит по 1000$. Заметьте, что состоянием счета является не Int, а Ref[Int]. Ref — это такой специальный контейнер, работать с которым можно только внутри транзакции (аналог TVar в Haskell). Сама же транзакция создается с помощью функции atomic. Обратите внимание, как то, что в Haskell решается с помощью монад, в Scala достигается при помощи более понятных и легко компонуемых неявных аргументов. Внутри транзакции c Ref’ами можно работать при помощи x.get() и x.set(...), либо при помощи более лаконичной записи x() и x() = ..., которая делает то же самое. В данном примере создается 10 футур, которые случайным образом бросают деньги со счета на счет таким образом, чтобы счета не уходили в минус. Специально для демонстрации компонуемости кусков кода, работающих с STM, была выделена функция transfer. Перед завершением работы программа выводит состояние всех счетов и суммарное количество денег на них:

FINAL STATE:
account2 -> 27
account6 -> 691
account9 -> 1773
account1 -> 422
account5 -> 1316
account0 -> 671
account8 -> 3481
account4 -> 1272
account7 -> 167
account3 -> 180
TOTAL: 10000

Как видите, денег в системе не прибавилось и не убавилось, что есть именно то, чего мы и ожидали. Обратите внимание, с какой легкостью мы оперируем счетами! Представьте, если бы мы пытались решить ту же самую задачу при помощи акторов, каких-нибудь агентов или мьютексов. Это был бы просто кошмар! Ну или все потоки упирались бы в одно и то же место.

В приведенном примере внутри транзакции использовались некоторые функции с побочными эффектами — nextInt и println. В этом нет ничего плохого, но в общем случае вы не должны так делать, так как код внутри блока atomic вообще может повторяться, а вы вряд ли хотите, чтобы ваша система многократно отправляла письма или типа того. Можете провести эксперимент, изменив код примерно таким образом:

var attempt = 0
atomic { implicit txn =>
  attempt = attempt + 1
  if(attempt > 1) {
    println(s"[$threadNumber] Retried, attempt = $attempt")
  }
  // ...
}

Вы заметите, что большинство транзакций выполняются успешно с первой попытки, но в редких случаях транзакция начинает выполняться сначала, возможно, даже не один раз. Такое происходит в случае, если во время выполнения транзакции использованное состояние было изменено другими потоками. Также программист может сам перезапустить транзакцию с помощью функции retry:

atomic { implicit txn =>
  // ...
  if(someConditionWasViolated(...)) retry
  // ...
}

Следует однако отметить, что при вызове retry транзакция будет выполнена повторно только после изменении Ref’ов, прочитанных перед вызовом retry. Действительно, было бы странно повторять транзакцию вновь и вновь при условии, что ничего не изменилось. Если же перед вызовом retry никакие Ref’ы не читались, вы получите такое исключение:

java.lang.IllegalStateException: explicit retries cannot succeed
  because cumulative read set is empty

За кадром остались транзакционные коллекции вроде TMap и TSet, комбинаторы типа orAtomic, и не только. Но я уверен, что теперь, ознакомившись с основами использования STM в Scala, вы без труда разберетесь в этих и других вопросах самостоятельно.

Ссылки по теме:

А используете ли вы ScalaSTM в своих проектах?

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.