频道栏目
首页 > 资讯 > 云计算 > 正文

Akka并发编程——1、Actor模型(一)

17-05-09        来源:[db:作者]  
收藏   我要投稿

本节主要内容 定义Actor 创建Actor 1. 定义Actor 通过扩展akka.actor.Actor 特质并实现receive方法来定义Actor,代码示例如下 //通过扩展Actor并实现receive方法来定义Actor class MyActor extends Actor { //获取LoggingAdapter,用于日志输出 

1. 定义Actor

通过扩展akka.actor.Actor 特质并实现receive方法来定义Actor,代码示例如下

//通过扩展Actor并实现receive方法来定义Actor
class MyActor extends Actor {
    //获取LoggingAdapter,用于日志输出
    val log = Logging(context.system, this)

    //实现receive方法,定义Actor的行为逻辑,返回的是一个偏函数
    def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }
  }

receive方法被定义在Actor当中,方法标签如下

//Actor中的receive方法定义,
type Receive = PartialFunction[Any, Unit]
def receive: Actor.Receive

下面给出其完整使用代码:

object Example_01 extends App{
  import akka.actor.Actor
  import akka.event.Logging
  import akka.actor.ActorSystem
  import akka.actor.Props

  class MyActor extends Actor {
    val log = Logging(context.system, this)

    def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }
  }
 //创建ActorSystem对象
  val system = ActorSystem("MyActorSystem")
  //返回ActorSystem的LoggingAdpater
  val systemLog=system.log
  //创建MyActor,指定actor名称为myactor
  val myactor = system.actorOf(Props[MyActor], name = "myactor")

  systemLog.info("准备向myactor发送消息")
  //向myactor发送消息
  myactor!"test"
  myactor! 123

  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}


代码运行结果:

[INFO] [04/02/2016 09:29:54.223] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 09:29:54.224] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received test
[INFO] [04/02/2016 09:29:54.224] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received unknown message


输出“[INFO] [04/02/2016 09:29:54.224] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received test”中,

[MyActorSystem-akka.actor.default-dispatcher-3]为对应的Actor轻量级线程名
[akka://MyActorSystem/user/myactor]为Actor路径信息,(user在下面的图中有解释,为守护线程。)
received test为
def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }


方法处理后的输出。关于[akka://MyActorSystem/user/myactor]路径信息,将在后续内容中进行详细阐述。

 
此外,也可以通过混入ActorLogging来实现日志功能,具体代码如下:
class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }
  }

ActorLogging的定义如下:

trait ActorLogging { this: Actor ?
  private var _log: LoggingAdapter = _

  def log: LoggingAdapter = {
    // only used in Actor, i.e. thread safe
    if (_log eq null)
      _log = akka.event.Logging(context.system, this)
    _log
  }

}


完整代码如下:

/*
 *定义Actor时混入ActorLogging
 */
object Example_02 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }
  }

  //创建ActorSystem对象
  val system = ActorSystem("MyActorSystem")
  //返回ActorSystem的LoggingAdpater
  val systemLog=system.log
  //创建MyActor,指定actor名称为myactor
  val myactor = system.actorOf(Props[MyActor], name = "myactor")

  systemLog.info("准备向myactor发送消息")
  //向myactor发送消息
  myactor!"test"
  myactor! 123

  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

代码运行结果:

[INFO] [04/02/2016 09:39:21.088] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 09:39:21.089] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received test
[INFO] [04/02/2016 09:39:21.089] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received unknown message

代码原理与Example_01类似,这里不再赘述。
 

2. 创建Actor

在前面两个例子中,通过

val myactor = system.actorOf(Props[MyActor], name = "myactor")

创建Actor,需要注意的是system.actorOf方法返回的是ActorRef对象(ActorRef为Actor的引用),使用ActorRef对象可以进行消息的发送等操作。Props为配置对象,在创建Actor时使用,它是不可变的对象,因此它是线程案例且完全可共享的。Akka中创建Actor时,也允许直接传入MyActor对象的引用,例如

//直接通过new MyActor的方式传入MyActor对象的引用,注意这里是Props(new MyActor)
val myactor = system.actorOf(Props(new MyActor), name = "myactor")


但是Akka不推荐这么做,官方文档给出的解释是这种方式会导致不可序列化的Props对象且可能会导致竞争条件(破坏Actor的封装性)。另外需要特别注意的是,不允许通过下列代码创建Actor

//下列两行代码编译可以通过,但运行时出抛出异常
  val  myActor=new MyActor
  val myactor = system.actorOf(Props(myActor), name = "myactor")


完整运行代码如下:


/*
 *创建Actor
 */
object Example_03 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }
  }

  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //下列两行代码编译可以通过,但运行时出抛出异常
  val  myActor=new MyActor
  val myactor = system.actorOf(Props(myActor), name = "myactor")

  systemLog.info("准备向myactor发送消息")
  //向myactor发送消息
  myactor!"test"
  myactor! 123

  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

运行结果如下:
Exception in thread "main" akka.actor.ActorInitializationException: You cannot create an instance of [chapter02.Example_03$MyActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.
    at akka.actor.ActorInitializationException$.apply(Actor.scala:167)
    at akka.actor.Actor$class.$init$(Actor.scala:423)
    at chapter02.Example_03$MyActor.(MyActor.scala:73)
    at chapter02.Example_03$delayedInit$body.apply(MyActor.scala:84)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
	at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at chapter02.Example_03$.main(MyActor.scala:68)
    at chapter02.Example_03.main(MyActor.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
从“You cannot create an instance of [chapter02.Example_03$MyActor] explicitly using the constructor (new). You have to use one of the ‘actorOf’ factory methods to create a new actor.”

可以看到,不能通过显式地调用构造函数创建Actor,只能使用actorOf工厂方法创建Actor。


(1)调用system.actorOf创建Actor

val system = ActorSystem("mySystem")
val myActor = system.actorOf(Props[MyActor], "myactor2")
完整代码在Example_01、Example_02中已经演示过了,这里需要说明的是通过system.actorOf工厂方法创建的Actor为顶级Actor.


 

在Akka框架中,每个Akka应用程序都会有一个守卫Actor(Daemon Actor),名称为user。所有通过system.actorOf工厂方法创建的Actor都为user的子Actor,也是整个Akka程序的顶级Actor。

(注意顶级Actor和守卫Actor的区别。)

(2)调用context.actorOf创建Actor

/*
 *创建Actor,调用context.actorOf方法
 */
object Example_04 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props


  class FirstActor extends Actor with ActorLogging{
    //通过context.actorOf方法创建Actor
    val child = context.actorOf(Props[MyActor], name = "myChild")
    def receive = {
      case x => child ! x;log.info("received "+x)
    }

  }


  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test")
      case _      => log.info("received unknown message")
    }
  }

  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //创建FirstActor对象
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

  systemLog.info("准备向myactor发送消息")
  //向myactor发送消息
  myactor!"test"
  myactor! 123
  Thread.sleep(5000)
  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

代码运行结果
[INFO] [04/02/2016 15:05:34.770] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myChild] received test
[INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myChild] received unknown message
通过代码的运行结果可以看到,FirstActor的Actor路径信息为akka://MyActorSystem/user/firstActor,而通过
class FirstActor extends Actor with ActorLogging{
    //通过context.actorOf方法创建Actor
    val child = context.actorOf(Props[MyActor], name = "myChild")
    def receive = {
      case x => child ! x;log.info("received "+x)
    }

  }


代码使用context.actorOf创建的MyActor,其Actor路径信息为[akka://MyActorSystem/user/firstActor/myChild],这意味着mychild为firstActor的子Actor,层次结构如下图所示

也就是说context.actorOf和system.actorOf的差别是system.actorOf创建的actor为顶级Actor,而context.actorOf方法创建的actor为调用该方法的Actor的子Actor

相关TAG标签
上一篇:cdh5.3.10_hadoop环境迁移之主机IP修改
下一篇:显示系统所有进程的信息
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站