Введение в Scala.

С нуля до распределенных приложений.

Асинхронные операции. Futures

Scala в основном используется как платформа для построения “живых” распределенных систем. Сложно представить себе живую систему в которой функционирует лишь один агент, поэтому задачи взаимодействия асинхронно работающих агентов являются краеугольным камнем системы.

В этой статье мы рассмотрим один из основных высокоуровневых примитивов для асинхронных операций в Scala.

Futures

Результаты отложенных/параллельных вычислений возвращающих значение типа T в Scala представлены объектами класса Future[T].

Каждый объект класса Future[T] может быть в одном из трех состояний:

  • вычисление не закончено
  • вычисление закончено успешно
  • вычисление закончено с ошибкой

Он создается в состоянии незаконченного вычисления и может быть переведен единственный раз в любое из двух других состояний.

Создать параллельное вычисление можно вызовом специального конструктора объекта-компаньона Future:

val greetWorld: Future[String] = Future("Hello, async world!")

val answerToLife: Future[Int] = Future {
  Thread.sleep(2.3667695e+17.toLong)
  42
}

def loadFileAsync(name: String): Future[String] = Future {
  scala.io.Source.fromFile(name).mkString
}

def loadPageAsync(url: String): Future[String] = Future { 
  scala.io.Source.fromURL(url).mkString
}

Заметим, что на каждый вызов не создается отдельный поток, а задействуется существующий из управляемого пула потоков (ExecutionContext). Для использования пула по умолчанию надо ввести его в зону видимости следующей командой:

import scala.concurrent.ExecutionContext.Implicits.global

Await

Есть несколько способов работы с результатами асинхронной операции:

  • заблокировать текущий поток превратив ее в синхронную операцию
  • зарегистрировать функцию обратного вызова (callback)
  • использовать комбинаторы

Первый подход применяется в самых простых программах и не рекомендуется для использования в серверных приложениях, т.к. он блокирует текущий поток превращая асинхронную операцию в синхронную, но расходующую больше ресурсов. Он заключается в вызове метода Await.result:

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global

val greetingFuture = Future {
  Thread.sleep(1000) 
  println("calculating..."); 
  "Hello" 
}

println("Friend")

val greeting = Await.result(greetingFuture, Duration.Inf)

println(s"Result: $greeting")

Результат:

Friend
calculating...
Result: Hello

Обратный вызов

Метод onComplete регистрирует вашу функцию обратного вызова для вызова по завершении асинхронной операции. Параметром функции будет объект класса Try[T] из пакета scala.util.

Этот объект представлен иерархией классов:

abstract class Try[+T] { ... }
case class Success[+T](value: T) extends Try[T] { ... }
case class Failure[+T](exception: Throwable) extends Try[T] { ... }

Соответственно, в случае успеха ваша функция обратного вызова будет вызывана с объектом класса Success, а в случае ошибки - Failure.

Это проще пояснить примером:

val a: Future[String] = Future { Thread.sleep(5 * 1000); "Delayed result" }

a.onComplete {
  case Success(result: String) => println(result)
  case Failure(ex: Exception) => println(s"Operation failed with $ex")
}

println("Immediate result") // отобразится до Delayed result

Более специфичные методы onSuccess и onFailure выполняют операцию в случае успешного и ошибочного завершения операции соответственно.

Современные программы (особенно серверные) зависят одновременно от многих асинхронных операций: сделать запрос к БД, загрузить страницу сайта, загрузить информацию с диска и т.п. Их работу можно согласовывать подписывая функции обратного вызова друг на друга, но это превращается в спагетти-код и становится неуправляемым:

/*
  считаем имя файла из базы данных
  загрузим его с диска
  скачаем страницу адрес которой указан в файле
*/

def queryDb(id: Int): Future[String] = DB.tables.files.fetch(id)

queryDb(8612).onComplete {
  case Failure(ex: Exception) => 
    println(s"Operation failed with $ex")
  case Success(fileName: String) => 
    loadFileAsync(fileName).onComplete {
      case Failure(ex: Exception) => 
        println(s"Operation failed with $ex")
      case Success(url: String) => 
        loadPageAsync(url).onComplete {
          case Failure(ex: Exception) => println(s"Operation failed with $ex")
          case Success(text: String) => println(s"Result: $text")
        }
    }
}

Такой код быстро теряет привлекательность. Выделением каждого обработчика в отдельную функцию можно лишь немного отсрочить неизбежное.

Комбинаторы

К счастью, Future является монадой, а это значит, что у него определены следующие методы позволяющие легко комбинировать операции:

  • map
  • flatMap
  • filter
  • foreach

Перепишем пример выше используя эти комбинаторы:

val combinedFuture: Future[String] = 
  queryDb(8612).
    flatMap(fileName => loadFileAsync(fileName)).
    flatMap(url => loadPageAsync(url)).
    map(pageText => println(pageText))

val result: String = Await.result(combinedFuture, Duration.Inf)
println(result)

// или при помощи for
for {
  fileName <- queryDb(8612)
  url <- loadFileAsync(fileName)
  pageText <- loadPageAsync(url)
} println(pageText)

Дополнительные комбинаторы

На объектах класса Future[T] и в самом объекте-компаньоне Future можно найти другие полезных комбинаторы для основных операций.

Рассмотрим некоторые из них:

.recover

заменит результат Future в случае ошибки

val okGoogle: Future[String] = 
  loadPageAsync("http://google.com").recover {
    // здесь можно перечислить исключения
    case _ => "cached result"
  }

.recoverWith

заменит результат Future на результат другого Future в случае ошибки

val okGoogle: Future[String] = 
  loadPageAsync("http://google.com").recover {
    case _ => loadPageAsync("http://www.google.com")
  }

Future.successful

оборачивает значение в тип Future не используя дополнительного потока

val result: Future[Int] = Future.successful(10)

Future.failed

оборачивает исключение в тип Future не используя дополнительного потока

val result: Future[Int] = Future.failed(new UnsupportedOperationException)

Future.firstCompletedOf

возвращает первый исполненный Future и выбрасывает результаты других

val a = Future { Thread.sleep(1000); 1 }
val b = Future { Thread.sleep(100); 2 }
val b = Future { Thread.sleep(10); 3 }

val result = Future.firstCompletedOf(Seq(a, b, c)) // 3

Future.sequence

объединяет Seq[Future[A]] в Future[Seq[A]], которая завершается, когда успешно завершаются все или как только один завершился с ошибкой

val a = Future { Thread.sleep(1000); 1 }
val b = Future { Thread.sleep(100); 2 }
val b = Future { Thread.sleep(10); 3 }

val result: Future[Seq[Int]] = Future.sequence(Seq(a, b, c)) // (1, 2, 3)

порядок сохраняется

Future.fold/Future.reduce

применяет функциональную свертку к последовательности Future

Контекст

Напоследок стоит заметить, что во многих случаях вам не надо будет покидать “контекст” монады Future вызовами Await.result и onComplete, а все функции интерфейса программы будут лишь трансформировать Future[A] во Future[B].

Например, распространенная MVC платформа Play Framework обслуживает веб-запросы методами вида:

def index = Action.async {
  val textFuture: Future[String] = loadFileAsync("roster.txt")
  textFuture.map(text => Ok(text))
}

где функция Action.async принимает параметром Future[A].