Scala 这个语言可以用得很复杂、也可以用得很简洁。它在 Java 并发和 OO 之上做了进一步的抽象,将代码量大大降低。
最近流行的很多项目都使用 Scala,比如 Akka,Spark,Kafka,Spray,Play Framework,足见它是一门生产力很高的语言。
这里主要总结了下一些并发模式和并发线程池需要注意的地方。
Scala 中的 Future 并发模式
一切皆 Future:
val mFuture = future { Thread sleep 1000 "result" } val result = Await result (mFuture, 3 seconds)
Callback 的几种方式:
f.onComplete { case Success(result) => case Failure(ex) => } Await result (mFuture, 5 seconds)
Timeout Fallback:
val searchFuture = search(worktime) val fallback = after(timeout, context.system.scheduler) { Future successful s"$worktime ms > $timeout" } Future firstCompletedOf Seq(searchFuture, fallback)
Future 之后的运算 map / filter 成其他 Future:
f.map { r1 => ... r2 }
多个 Future 链的处理 flatMap:
val nestedFuture: Future[Future[Boolean]] = heatWater(Water(25)).map { water => temperatureOkay(water) } val flatFuture: Future[Boolean] = heatWater(Water(25)).flatMap { water => temperatureOkay(water) }
多个 Future 的链合并的另一种方式:
val f = for { result1 <- remoteCall1 result2 <- remoteCall2 } yield List(result1, result2)
假如 future 定义在 for 之前则会并发执行,否则会顺序执行。另外,假如顺序执行 result1 可以作为参数传递到 remoteCall2 中。
转换 Future List 成 List Future (map to)
Future.sequence(list) //(并发执行)
转换 Future List 成单个 Future (map to)
Future.reduce(list)(f)
Option 和 getOrElse 经常用在 Future 执行中。
Scala 中的并发任务执行体 ExecutionContext
Scala 中的 ExecutionContext 和 Java 的线程池的概念非常相似。都是执行具体 Task 的执行体。
Scala 中默认的线程池:
import scala.concurrent.ExecutionContext.Implicits.global
是最方便的做法,如果不考虑优化和性能,在所有需要 ExecutionContext 的地方引用即可。当然这在生产环境是行不通的,原因是假如有 Task Block 了整个 global 线程池,应用将变得不可响应,即使 Block 不一定发生在本应用中,比如数据库的操作引起的 blocking 。
可以修改默认线程池的大小
-Dscala.concurrent.context.numThreads=8 -Dscala.concurrent.context.maxThreads=8
自定义线程池
创建固定大小线程池
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
假如熟悉 Java 线程池的话,线程池的创建和 Java 中完全一样,可以套用。
Scala & Akka 中的 Dispatcher
定义一个 Dispatcher:
my-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "fork-join-executor" # Configuration for the fork join pool fork-join-executor { # Min number of threads to cap factor-based parallelism number to parallelism-min = 2 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 2.0 # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100 }
或者这样:
my-thread-pool-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "thread-pool-executor" # Configuration for the thread pool thread-pool-executor { # minimum number of threads to cap factor-based core number to core-pool-size-min = 2 # No of core threads ... ceil(available processors * factor) core-pool-size-factor = 2.0 # maximum number of threads to cap factor-based number to core-pool-size-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100 }
在 Scala 代码中引用引用之前定义的 Dispatcher:
implicit val executionContext = system.dispatchers.lookup("my-dispatcher")
给某个 Actor 指定 dispatcher:
val myActor = context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")
最佳实践
将不同类型的运算进行 dispatcher 隔离:
给 blocking I/O 创建单独的线程池:
因为 JDBC 没有 non-blocking API,所以为 DB R/W Heavey R/W 这些 Block 操作创建单独的 dispatcher。并在不同的 dispatcher 中执行不同类型的 Future 计算。
object Contexts { implicit val simpleDbLookups: ExecutionContext = Akka.system.dispatchers.lookup("contexts.simple-db-lookups") implicit val expensiveDbLookups: ExecutionContext = Akka.system.dispatchers.lookup("contexts.expensive-db-lookups") implicit val dbWriteOperations: ExecutionContext = Akka.system.dispatchers.lookup("contexts.db-write-operations") implicit val expensiveCpuOperations: ExecutionContext = Akka.system.dispatchers.lookup("contexts.expensive-cpu-operations") }
同理,需要给 CPU 密集计算创建单独的线程池。
更多相关链接
http://doc.akka.io/docs/akka/snapshot/scala/dispatchers.html
https://www.playframework.com/documentation/2.4.x/ThreadPools
http://stackoverflow.com/questions/26593989/how-to-achieve-high-concurrency-with-spray-io-in-this-future-and-thread-sleep-ex
http://docs.scala-lang.org/overviews/core/futures.html
https://github.com/alexandru/scala-best-practices/blob/master/sections/4-concurrency-parallelism.md
http://www.javacodegeeks.com/2013/08/future-composition-with-scala-and-akka.html
http://danielwestheide.com/blog/2013/01/09/the-neophytes-guide-to-scala-part-8-welcome-to-the-future.html
http://nerd.kelseyinnis.com/blog/2013/11/12/idiomatic-scala-the-for-comprehension/
http://loicdescotte.github.io/posts/scala-compose-option-future/
http://buransky.com/scala/scala-for-comprehension-with-concurrently-running-futures/