Перестаем бояться футур (futures) в Scala

31 октября 2014

Очень трудно обслуживать 10 000 TCP соединений, если на каждое соединение вы создаете процесс или нитку операционной системы. Вот почему в Erlang есть акторы, в Go и Haskell — легковесные процессы, в Clojure — core.async и pulsar, а в OCaml — LWT. Для решения аналогичных проблем в Scala есть футуры.

Футуры в Scala

JVM не предлагает нам легковесных процессов. Точнее, когда-то она их вроде бы даже предлагала, а некоторые реализации JVM предлагают их и до сих пор. Но в традиционной виртуальной машине от Oracle ничего такого нет. Будучи языком под JVM, Scala также лишена каких-либо легковесных процессов. Вот почему возникла потребность в каких-то там футурах.

Лично мне нравится думать о футурах, как о небольших кусочках работы или лямбдочках. Где-то глубоко под слоями абстракции есть пул ниток операционной системы и очередь из лямбдочек. Как только какой-то из ниток нечего делать, она берет из очереди первую попавшуюся лямбдочку и выполняет ее. Во время выполнения лямбдочки нитка ни на что не отвлекается. Когда работа выполнена, нитка записывает куда-то результат и берется за следующий кусочек работы, если таковой имеется. Притом каждый кусочек работы в свою очередь может создавать новые кусочки работы.

На самом деле, все несколько сложнее. В частности, для работы с футурами предусмотрено множество удобных комбинаторов. Некоторые из них будут рассмотрены в этой заметке.

Но сначала нам понадобятся следующие импорты:

import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

Обратите особое внимание на последний импорт. Именно благодаря ему неявным (implicit) образом передается тот самый пул ниток всем функциям, которые в этом пуле нуждаются.

Создадим нашу первую футуру:

val sumF = Future {
  (1L to 100000L).sum
}

Переменная sumF имеет тип Future[Long]. Создание футуры происходит мгновенно и где-то там в фоне начинают происходить вычисления. Мы можем получить результат этих вычислений, повесив хендлер на успешное завершение футуры:

sumF onSuccess {
  case s => println(s"s = $s")
}

Здесь переменная s имеет тип Long. Если мы оставим код как есть и запустим программу, ничего никуда не будет выведено. Так происходит по той причине, что наш хендлер выполняется асинхронно, когда футура будет вычислена. Если программа завершится раньше времени, хендлер вызван не будет. Чтобы увидеть результат, можно написать Thread.sleep(1000). Понятное дело, в реальных программах так делать не стоит.

Еще один заслуживающий внимания комбинатор помимо onSuccess — это flatMap. Благодаря ему можно создать футуру, которой требуется результат вычисления другой футуры. Например:

// или лучше так: val doubledSumF = sumF.map(_ * 2)
val doubledSumF = sumF.flatMap {
  case s => Future { s * 2  }
}

val tripledSumF = sumF.flatMap {
  case s => Future { s * 3 }
}

Это просто пример, поэтому на секунду представим, что умножение лонгов — достаточно тяжелая операция, чтобы оправдать накладные расходы на создание футуры :) Переменные doubledSumF и tripledSumF имеют тип Future[Long]. Никакого блокирование в этом коде нет, все происходит асинхронно.

Помимо flatMap над футурами также можно делать map и filter. За счет этого можно использовать футуры в for comprehension примерно таким образом:

val resultF = for {
    s1 <- doubledSumF
    s2 <- tripledSumF
  } yield s1 + s2

Здесь s1 и s2 имеют тип Long, а resultF — тип Future[Long]. На самом деле, это просто более удобная запись из map, flatMap и filter. Проницательный читатель также без труда увидит в этом коде монады. Тут важно отметить, что все действия в for comprehension выполняются последовательно. Соответственно, если вы хотите, чтобы ваши футуры вычислялись параллельно, их следует создавать вне for’а, как в этом примере мы сделали с doubledSumF и tripledSumF.

Приведенный выше код был весь из себя такой параллельный и асинхронный. Если вы точно знаете, что делаете, и хотите получить результат вычисления футуры блокируемым образом, вот как это можно сделать:

val result = Await.result(resultF, 5.seconds)
println(s"result = $result")

Здесь result имеет тип Long.

Надеюсь, вы обратили внимание, как легко с помощью футур можно из обычного последовательного кода получить параллельный и/или асинхронный? Средства, предлагаемые другими языками, несколько менее удобны в этом плане.

Весь код целиком:

#!/usr/bin/env scala

import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

val sumF = Future {
  (1L to 100000L).sum
}
 
sumF onSuccess {
  case s => println(s"s = $s")
}

// или лучше так: val doubledSumF = sumF.map(_ * 2)
val doubledSumF = sumF.flatMap {
  case s => Future { s * 2 }
}

val tripledSumF = sumF.flatMap {
  case s => Future { s * 3 }
}

val resultF = for {
  s1 <- doubledSumF
  s2 <- tripledSumF
} yield s1 + s2

val result = Await.result(resultF, 5.seconds)
println(s"result = $result")

В контексте футур можно рассказать еще много о чем интересном. В частности, о комбинаторе sequence, с помощью которого можно получить Future[Seq[X]] из Seq[Future[X]]. Или blocking, предназначенном для оборачивания блокируемого куска кода, и о том, как он работает. Или о промисах (promises) — контейнерах, в которые можно записывать футуры, но только один раз. Но во всех этих вопросах вы сами без особого труда разберетесь при помощи ссылок на дополнительные материалы, которые будут приведены далее.

Футуры против легковесных процессов

На первый взгляд может показаться, что футуры, по сравнению с акторами в Erlang или легковесными процессами в Go, это как-то сложно и вообще костыль. Отчасти это правда, но только отчасти. Действительно, футуры несколько сложнее для понимания. Здесь не происходит подсчет редукций и переключение на другой процесс, если текущий выполнялся слишком долго, как, например, это происходит в Erlang. В самом деле, кода приходится писать чуть больше.

Важно понимать, что футуры — это очень другой подход по сравнению с Erlang, Haskell или Go. Он не лучше и не хуже, он просто другой. Футуры сложнее, но дают программисту куда большую гибкость. Если хотите, каждой вашей функции будет соответствовать одна футура. Получится как в Go или core.async. Даже дополнительного кода как такового не будет. Или вы можете обернуть десять функций в одну футуру, тем самым гарантируя, что запрос пользователя обработается за N мс, без прерывания где-то посередине на обработку пары сотен других запросов, чего вам никогда не позволит Erlang. (На самом деле, чтобы это работало, нужно произвести кое-какие махинации на уровне ОС (см раз, два, три), так как нитки тоже могут вытесняться). Это может быть важно не только с точки зрения каких-то гарантий касательно времени обработки запроса, но и в случае, если вам нужны более-менее правдоподобные метрики.

Плюс ко всему этому вы сами управляете execution context и можете даже использовать блокирующий код, не боясь, что у вас что-то где-то залипнет. Таким блокирующим кодом может быть сторонняя сишная библиотека. Притом, почему бы этой библиотеке не использовать случайно TLS? Или представьте, что ваша программа делает много тяжелых вычислений, например, обучает нейронные сети. Если все потоки легковесные, то накладные расходны на их шедулинг могут существенно замедлить вычисления. А вот если в языке футуры, можно использовать как обычные трэды ОС, так и трэдпул. Запускаем код в отдельной нитке, и никаких проблем! Реализовать поддержку как блокирующих, так и неблокирующих вызовов без футур довольно непросто. Как вариант, можно предусмотреть какой-то особый API в стиле «а вот этот легковесный поток на самом деле будет работать на выделенной нитке». Но тогда блокирующие и неблокирующие вызовы внешне становятся неразличимы, что может привести к серьезным и трудным в обнаружении ошибкам. Плюс не понятно, действительно ли в этом случае уходят все накладные расходы, ведь внутри легковесные потоки все равно работают схожим с футурами образом. В Haskell блокирующий код выполняется на отдельном трэдпуле. Однако не очень понятно, что происходит в случае, когда блокируются все нитки из этого трэдпула. Не может же он расти до бесконечности.

Кстати, про различимость блокирующих и неблокирующих вызовов. Еще одно интересное свойство футур заключается в том, что при их использовании вы по типу функции можете сказать, имеет ли она побочные эффекты. Это, конечно, не полный аналог монады IO из мира Haskell, так как код, который выглядит чистым, все еще может, например, использовать ГСЧ. Но как раз ГСЧ и подобные вещи на практике нас мало интересуют, в отличие от хождения в базы данных и запуска ракет. Мне нравиться думать о Future, как об «IO в буквальном смысле», то есть, вводе-выводе, а не вообще всех побочных эффектах. Это даже удобнее, чем IO, если так подумать.

Заключение

Дополнительные материалы:

Лично мне очень нравится, что футуры вообще не привязаны к RTS, как это сделано в Erlang/Go/Haskell. Следовательно, они могут развиваться независимо, и вообще, в любой момент быть заменены на альтернативную реализацию. В частности, ребята из Twitter используют свои собственные футуры. Подход, аналогичный футурам в Scala, может быть применен в любом языке, где можно худо-бедно работать с потоками операционной системы. Не требуется тащить легковесные процессы в ядро языка, можно успешно реализовать их в виде библиотеки. При этом мы автоматически получаем производительность JIT компилятора или даже нативного кода. Чего нет в Erlang, и я сомневаюсь, что будет, так как для этого придется очень сильно поломать семантику тамошних акторов.

Повторюсь, футуры — это очень другой подход.

Дополнение: Основы работы с акторами в Akka

Метки: , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.