С нуля до распределенных приложений.
Scala в основном используется как платформа для построения “живых” распределенных систем. Сложно представить себе живую систему в которой функционирует лишь один агент, поэтому задачи взаимодействия асинхронно работающих агентов являются краеугольным камнем системы.
В этой статье мы рассмотрим один из основных высокоуровневых примитивов для асинхронных операций в Scala.
Результаты отложенных/параллельных вычислений возвращающих значение типа T в Scala представлены объектами класса Future[T].
Каждый объект класса Future[T] может быть в одном из трех состояний:
Он создается в состоянии незаконченного вычисления и может быть переведен единственный раз в любое из двух других состояний.
Создать параллельное вычисление можно вызовом специального конструктора объекта-компаньона Future:
val greetWorld: Future[String] = Future("Hello, async world!")
val answerToLife: Future[Int] = Future {
Thread.sleep(2.3667695e+17.toLong)
42
}
def loadFileAsync(name: String): Future[String] = Future {
scala.io.Source.fromFile(name).mkString
}
def loadPageAsync(url: String): Future[String] = Future {
scala.io.Source.fromURL(url).mkString
}
Заметим, что на каждый вызов не создается отдельный поток, а задействуется существующий из управляемого пула потоков (ExecutionContext). Для использования пула по умолчанию надо ввести его в зону видимости следующей командой:
import scala.concurrent.ExecutionContext.Implicits.global
Есть несколько способов работы с результатами асинхронной операции:
Первый подход применяется в самых простых программах и не рекомендуется для использования в серверных приложениях, т.к. он блокирует текущий поток превращая асинхронную операцию в синхронную, но расходующую больше ресурсов. Он заключается в вызове метода Await.result:
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
val greetingFuture = Future {
Thread.sleep(1000)
println("calculating...");
"Hello"
}
println("Friend")
val greeting = Await.result(greetingFuture, Duration.Inf)
println(s"Result: $greeting")
Результат:
Friend
calculating...
Result: Hello
Метод onComplete регистрирует вашу функцию обратного вызова для вызова по завершении асинхронной операции. Параметром функции будет объект класса Try[T] из пакета scala.util.
Этот объект представлен иерархией классов:
abstract class Try[+T] { ... }
case class Success[+T](value: T) extends Try[T] { ... }
case class Failure[+T](exception: Throwable) extends Try[T] { ... }
Соответственно, в случае успеха ваша функция обратного вызова будет вызывана с объектом класса Success, а в случае ошибки - Failure.
Это проще пояснить примером:
val a: Future[String] = Future { Thread.sleep(5 * 1000); "Delayed result" }
a.onComplete {
case Success(result: String) => println(result)
case Failure(ex: Exception) => println(s"Operation failed with $ex")
}
println("Immediate result") // отобразится до Delayed result
Более специфичные методы onSuccess и onFailure выполняют операцию в случае успешного и ошибочного завершения операции соответственно.
Современные программы (особенно серверные) зависят одновременно от многих асинхронных операций: сделать запрос к БД, загрузить страницу сайта, загрузить информацию с диска и т.п. Их работу можно согласовывать подписывая функции обратного вызова друг на друга, но это превращается в спагетти-код и становится неуправляемым:
/*
считаем имя файла из базы данных
загрузим его с диска
скачаем страницу адрес которой указан в файле
*/
def queryDb(id: Int): Future[String] = DB.tables.files.fetch(id)
queryDb(8612).onComplete {
case Failure(ex: Exception) =>
println(s"Operation failed with $ex")
case Success(fileName: String) =>
loadFileAsync(fileName).onComplete {
case Failure(ex: Exception) =>
println(s"Operation failed with $ex")
case Success(url: String) =>
loadPageAsync(url).onComplete {
case Failure(ex: Exception) => println(s"Operation failed with $ex")
case Success(text: String) => println(s"Result: $text")
}
}
}
Такой код быстро теряет привлекательность. Выделением каждого обработчика в отдельную функцию можно лишь немного отсрочить неизбежное.
К счастью, Future является монадой, а это значит, что у него определены следующие методы позволяющие легко комбинировать операции:
Перепишем пример выше используя эти комбинаторы:
val combinedFuture: Future[String] =
queryDb(8612).
flatMap(fileName => loadFileAsync(fileName)).
flatMap(url => loadPageAsync(url)).
map(pageText => println(pageText))
val result: String = Await.result(combinedFuture, Duration.Inf)
println(result)
// или при помощи for
for {
fileName <- queryDb(8612)
url <- loadFileAsync(fileName)
pageText <- loadPageAsync(url)
} println(pageText)
На объектах класса Future[T] и в самом объекте-компаньоне Future можно найти другие полезных комбинаторы для основных операций.
Рассмотрим некоторые из них:
заменит результат Future в случае ошибки
val okGoogle: Future[String] =
loadPageAsync("http://google.com").recover {
// здесь можно перечислить исключения
case _ => "cached result"
}
заменит результат Future на результат другого Future в случае ошибки
val okGoogle: Future[String] =
loadPageAsync("http://google.com").recover {
case _ => loadPageAsync("http://www.google.com")
}
оборачивает значение в тип Future не используя дополнительного потока
val result: Future[Int] = Future.successful(10)
оборачивает исключение в тип Future не используя дополнительного потока
val result: Future[Int] = Future.failed(new UnsupportedOperationException)
возвращает первый исполненный Future и выбрасывает результаты других
val a = Future { Thread.sleep(1000); 1 }
val b = Future { Thread.sleep(100); 2 }
val b = Future { Thread.sleep(10); 3 }
val result = Future.firstCompletedOf(Seq(a, b, c)) // 3
объединяет Seq[Future[A]] в Future[Seq[A]], которая завершается, когда успешно завершаются все или как только один завершился с ошибкой
val a = Future { Thread.sleep(1000); 1 }
val b = Future { Thread.sleep(100); 2 }
val b = Future { Thread.sleep(10); 3 }
val result: Future[Seq[Int]] = Future.sequence(Seq(a, b, c)) // (1, 2, 3)
порядок сохраняется
применяет функциональную свертку к последовательности Future
Напоследок стоит заметить, что во многих случаях вам не надо будет покидать “контекст” монады Future вызовами Await.result и onComplete, а все функции интерфейса программы будут лишь трансформировать Future[A] во Future[B].
Например, распространенная MVC платформа Play Framework обслуживает веб-запросы методами вида:
def index = Action.async {
val textFuture: Future[String] = loadFileAsync("roster.txt")
textFuture.map(text => Ok(text))
}
где функция Action.async принимает параметром Future[A].