

在编写复杂、耗时的应用程序时,我们经常会使用多线程以及并发来降低响应时间或者提高性能。可惜,传统的并发解决方案导致了一些问题,如线程安全、竞态条件、死锁、活锁以及不容易理解的、容易出错的代码。共享的可变性是罪魁祸首。 避免共享的可变性,便已经规避了许多问题。但是如何避免呢?这就是Actor模型发挥作用的地方。Actor帮助我们将共享的可变性转换为隔离的可变性(isolated mutability)。Actor是保证互斥访问的活动对象。没有两个线程会同时处理同一个Actor。由于这种天然的互斥行为,所有存储在Actor中的数据都自动是线程安全的——不需要任何显式的同步。 如果能将一个任务有意义地分解为几个子任务,即分而治之,就可以使用Actor模型来解决这个问题,设计良好又清晰,并且避免了通常的并发问题。

1. 顺序探索目录


  import java.io.File

  def getChildren(file: File) = {
      val children = file.listFiles()
      if (children != null) children toList else List()
  }                                               //> getChildren: (file: java.io.File)List[java.io.File]

  val start = System.nanoTime                     //> start  : Long = 56001752967194
  val exploreFrom = new File("/Users/baidu/Documents")
                                                  //> exploreFrom  : java.io.File = /Users/baidu/Documents
  var count = 0L                                  //> count  : Long = 0
  var filesToVisit = List(exploreFrom)            //> filesToVisit  : List[java.io.File] = List(/Users/baidu/Documents)

  while (filesToVisit.nonEmpty) {
      val head = filesToVisit.head
      filesToVisit = filesToVisit.tail

      val children = getChildren(head)
      //println(s"$head : ${children.mkString}")
      count = count + children.count{! _.isDirectory }
      filesToVisit = filesToVisit ::: children.filter{ _.isDirectory }

  val end = System.nanoTime                       //> end  : Long = 56008302310770
  println(s"Number of files found:$count")        //> Number of files found:181893
  println(s"Time taken: ${(end - start)/1.0e9} seconds")
                                                  //> Time taken: 6.549343576 seconds

这个例子可以正常工作,没有问题。唯一的问题可能是性能不足,在研究如何提高性能之前,先简单介绍下 Actor.

2. Actor例子-1

为了创建使用Actor的例子,我们需要先下载 akka 的包,从https://doc.akka.io/downloads/可以找到。


  import akka.actor._
  import scala.concurrent.Await
  import scala.concurrent.duration.Duration

  // 要创建一个 Actor,需要继承 Actor 特质并实现 receive() 方法
  class HollywoodActor() extends Actor {
    def receive: Receive = {
      case message => println(s"$message - ${Thread.currentThread}")

  // Akka的Actor托管在一个ActorSystem中,它管理了线程、消息队列以及Actor的生命周期。相对于使用传统的new关键字来创建实例,我们使用了一种特殊的actorOf工厂方法来创建Actor,并将其对应的ActorRef赋值给了名为depp/hanks的引用。
  val system = ActorSystem("sample")
  val depp = system.actorOf(Props[HollywoodActor])
  val hanks = system.actorOf(Props[HollywoodActor])
  // !方法给 actor 发送数据
  depp ! "Wonka"
  hanks ! "Gump"

  depp ! "Sparrow"
  hanks ! "Phillips"


  val terminateFuture = system.terminate()
  Await.ready(terminateFuture, Duration.Inf)


Wonka - Thread[sample-akka.actor.default-dispatcher-2,5, main]

Gump - Thread[sample-akka.actor.default-dispatcher-3,5, main]

Calling from Thread[main,5, main]

Phillips - Thread[sample-akka.actor.default-dispatcher-3,5, main]
        Sparrow - Thread[sample-akka.actor.default-dispatcher-2,5, main]

3. Actor例子-2


我们通过一个增强版本的 HollywoodActor 来观察这点:

  import scala.collection._
  import scala.concurrent.duration._
  import akka.pattern.ask
  import akka.util.Timeout
  import akka.actor._

  case class Play(role: String)
  case class ReportCount(role: String)

  class NewHollywoodActor() extends Actor {
    val messagesCount: mutable.Map[String, Int] = mutable.Map()

    def receive: Receive = {
      case Play(role) =>
        val currentCount = messagesCount.getOrElse(role, 0)
        messagesCount.update(role, currentCount + 1)
        println(s"Playing $role")
      case ReportCount(role) =>
        sender ! messagesCount.getOrElse(role, 0)

如果接收到Play(role),那我们更新messagesCount对应的值。如果接收到ReportCount,则返回messagesCount的值,这个值也还是通过!方法发送,不同的是发送给 sender,也就是ReportCount的发送方。

接着创建 depp hanks 两个 actor,使用 !方法发送 Play(…):

  val newSystem = ActorSystem("sample")     //> newSystem  : akka.actor.ActorSystem = akka://sample
  val newDepp = newSystem.actorOf(Props[NewHollywoodActor])
      //> newDepp  : akka.actor.ActorRef = Actor[akka://sample/user/$a#-557464924]
  val newHanks = newSystem.actorOf(Props[NewHollywoodActor])
      //> newHanks  : akka.actor.ActorRef = Actor[akka://sample/user/$b#192877015]
  newDepp ! Play("Wonka")
  newHanks ! Play("Gump")

  newDepp ! Play("Wonka")
  newDepp ! Play("Sparrow")

然后我们通过 ?方法发送 ReportCount 获取角色对应的计数(如果需要响应,则使用?方法):

  implicit val timeout: Timeout = Timeout(2.seconds)
      //> timeout  : akka.util.Timeout = Timeout(2 seconds)
  val wonkaFuture = newDepp ? ReportCount("Wonka")
      //| wonkaFuture  : scala.concurrent.Future[Any] = Future(<not completed>)
  val sparrowFuture = newDepp ? ReportCount("Sparrow")
      //> sparrowFuture  : scala.concurrent.Future[Any] = Future(<not completed>)
  val gumpFuture = newHanks ? ReportCount("Gump")
      //> gumpFuture  : scala.concurrent.Future[Any] = Future(<not completed>)

  val wonkaCount = Await.result(wonkaFuture, timeout.duration)
      //> wonkaCount  : Any = 2
  val sparrowCount = Await.result(sparrowFuture, timeout.duration)
      //> sparrowCount  : Any = 1
  val gumpCount = Await.result(gumpFuture, timeout.duration)
      //> gumpCount  : Any = 1

  println(s"wonkaCount: ${wonkaCount}")     //> wonkaCount: 2
  println(s"sparrowCount: ${sparrowCount}") //> sparrowCount: 1
  println(s"gumpCount: ${gumpCount}")       //> gumpCount: 1

  val newTerminateFuture = newSystem.terminate()
      //> newTerminateFuture  : scala.concurrent.Future[akka.actor.Terminated] = Future(<not completed>)
  Await.ready(newTerminateFuture, Duration.Inf)
      //> res1: Actor.newTerminateFuture.type = Future(Success(Terminated(Actor[akka://sample/])))

不同于什么也不返回的!()方法,?()方法返回一个 Future.我们将三次调用返回的 Future,并将其分别保存在变量wonkaFuturesparrowFuturegumpFuture中,然后使用 Awwait 获取响应的 result。

4. 使用Actor模型探索目录



先创建一个无状态的 FileExplorer Actor,接受一个目录名dirName,对外返回两个数据:

  1. 该目录下所有的一级子目录
  2. 该目录下所有的以及文件数量
  class FileExplorer extends Actor {
    def receive: Receive = {
      case dirName: String =>
        val file = new File(dirName)
        val children = file.listFiles()
        var filesCount = 0

        if (children != null) {
          children.filter { _.isDirectory }.foreach{ sender ! _.getAbsolutePath }
          filesCount = children.count { ! _.isDirectory }

        sender ! filesCount

接着创建有状态的 FilesCounter,记录时间、文件数等。

  import akka.routing._

  class FilesCounter extends Actor {
    val start: Long = System.nanoTime
    var filesCount = 0L
    var pending = 0

    val fileExplorers: ActorRef =

    def receive: Receive = {
      case dirName: String =>
        // println(s"dirName: ${dirName}")
        pending = pending + 1
        fileExplorers ! dirName
      case count: Int =>
        filesCount = filesCount + count
        pending = pending - 1
        // println(s"pending: ${pending}")
        if (pending == 0) {
          val end = System.nanoTime
          println(s"Files count: $filesCount")
          println(s"Time taken: ${(end - start)/1.0e9} seconds")

fileExplorers 是 100 个 FileExplorer 的实例,使用 RoundRobin 路由。即发送给 fileExplorers 的消息将会按照该方式分配给具体的 FileExplorer.

其接收目录名,发送给 fileExplores 探索目录,后者则发送回子目录名及文件数。子目录名继续探索,文件数则被递增到 filesCount 这个变量。

最后,通过给 filesCounter 这个 actor 发送一个初始目录名开始统计:

  val fileSystem = ActorSystem("sample")    //> fileSystem  : akka.actor.ActorSystem = akka://sample
  val filesCounter = fileSystem.actorOf(Props[FilesCounter])
                                                  //> filesCounter  : akka.actor.ActorRef = Actor[akka://sample/user/$a#-87288343
                                                  //| ]
  filesCounter ! "/Users/baidu/Documents"
  Thread.sleep(10000)                       //> Files count: 181893
                                                  //| Time taken: 4.18749764 seconds/
