核心内容:
1、使用SBT开发Akka第一个案例源码解析MapActor、ReduceActor、AggregateActor |
实例程序:HelloAkka
package akka.dt.app.scala.messages import akka.actor.{ActorRef, ActorSystem, Props} import akka.dt.app.scala.actors.MasterActor object HelloAkka { def main(args: Array[String]): Unit = { //在scala语言中,创建ActorSystem实际上有两种方式 var system: ActorSystem = ActorSystem.create("HelloAkka") //ActorSystem实际上是一个静态类 var master: ActorRef = system.actorOf(Props[MasterActor],"master") //本行代码后续还要改动 master ! "Scala and Hadoop" //ActorSystem向masterActor发送消息 master ! "Java and Spark Scala Hadoop and " //注意:master在这里实际上是消息的接受者 master ! "Hbase and spark Java and Scala" //!在这里实际上是一个函数 Thread.sleep(2000) master ! new Result //获取最终的消息 Thread.sleep(2000) system.shutdown() } }
实例程序:MapData
package akka.dt.app.scala.messages import java.util class MapData(var dataList:util.ArrayList[WordCount]) //属性的定义以泛型的定义与Java有很大的不同 { def getDataList():util.ArrayList[WordCount] = dataList }
实例程序:ReduceData
package akka.dt.app.scala.messages import java.util class ReduceData(var hashMap: util.HashMap[String,Integer]) { def getHashMap(): util.HashMap[String, Integer] = hashMap }
实例程序:Result
package akka.dt.app.scala.messages class Result
实例程序:AggregateActor
package akka.dt.app.scala.actors import java.util import akka.actor.Actor import akka.dt.app.scala.messages.{ReduceData, Result} import scala.collection.JavaConverters._ class AggregateActor extends Actor { var finalHashMap = new util.HashMap[String,Integer]() //重写方法,并进行消息的模式匹配处理 override def receive: Receive = { case message:ReduceData => { aggregateInMemoryReduce(message.getHashMap()) } case message:Result => println(finalHashMap.toString) //将最终结果进行输出 case _ => //否则的话不做任何的处理 } def aggregateInMemoryReduce(hashMap: util.HashMap[String, Integer]) { var count: Integer = 0 for(key <- hashMap.keySet().asScala) //依次遍历集合当中的每一个key值 { if(finalHashMap.containsKey(key)) { finalHashMap.put(key,finalHashMap.get(key)+hashMap.get(key)) //重新存放value数值 }else finalHashMap.put(key,hashMap.get(key)) } } }
实例程序:MapActor
package akka.dt.app.scala.actors import java.util.{StringTokenizer, _} import akka.actor.{Actor, ActorRef} import akka.dt.app.scala.messages.{MapData, WordCount} class MapActor(reduceActor:ActorRef) extends Actor { override def receive: Receive = //重写方法,并进行相应的模式匹配处理 { case message:String => { reduceActor ! evaluateExpression(message) } case _ => //否则的话不对消息进行任何的处理 } //实现evaluateExpression方法,仅仅是语法上稍微改动而已 //实现的逻辑与Java类似,但是到底什么时候用val,什么时候用var呢 def evaluateExpression(line:String): MapData = { var list = new ArrayList[WordCount]() //定义一个分词器 var parser = new StringTokenizer(line) //实际上调用了类的构造函数 val defaultCount: Integer = 1 while(parser.hasMoreTokens) //判断分词器的右边是否含有单词 { var word = parser.nextToken() list.add(new WordCount(word,defaultCount)) } new MapData(list) } }
实例程序:MasterActor
package akka.dt.app.scala.actors import akka.actor.{Actor, Props} import akka.dt.app.scala.messages.Result class MasterActor extends Actor //继承的Actor不同,实现的方法也不同 { val aggregateActor = context.actorOf(Props[AggregateActor],"aggregate") val reduceActor = context.actorOf(Props(new ReduceActor(aggregateActor)),name = "reduce") //因为我们的MapActor会将结果交给ReduceActor,所以形式参数是ReduceActor。 val mapActor = context.actorOf(Props(new MapActor(reduceActor)),"map") //模式匹配在一定程度上功能相当于if override def receive: Receive = //在函数当中进行模式匹配对消息进行相应的处理 { case message:String => mapActor ! message case message:Result => aggregateActor ! message case _ => //不对消息进行任何的处理 } }
实例程序:ReduceActor
package akka.dt.app.scala.actors import java.util import akka.actor.{Actor, ActorRef} import akka.dt.app.scala.messages.{MapData, ReduceData, WordCount} import scala.collection.JavaConverters._ class ReduceActor(var aggregateActor: ActorRef) extends Actor { //方法重写仅仅是函数修饰符与函数执行体的改变而已,并且函数的访问权限不能更加严格 override def receive: Receive = //receive实际上是一个偏函数 { //Java当中用if条件表达式,scala当中用case模式匹配 case message:MapData => { var reduceData:ReduceData = reduce(message.getDataList()) aggregateActor ! reduceData //发什么类型的消息,在对应的receive方法中有相应的匹配处理方法 } case _ => //其余别的消息不做任何处理 } //通过reduce函数做本地归并 def reduce(list: util.ArrayList[WordCount]): ReduceData = { var hashMap= new util.HashMap[String,Integer]() //scala当中有相应的类型推断机制 //用for迭代语法进行处理 //Java当中的语法: WordCount wordCount : list for(wordCount <- list.asScala) //集合中的每一个对象依次赋值给WordCount { if(hashMap.containsKey(wordCount.getWord())) //如果映射表当中已经存储了这个单词 { hashMap.put(wordCount.getWord(), hashMap.get(wordCount.getWord()) + 1) }else hashMap.put(wordCount.getWord(),Integer.valueOf(1)) } return new ReduceData(hashMap) } }
程序运行结果:我调试了很长时间,但是始终抛出一个异常:
"C:\Program Files (x86)\Java\jdk1.8.0_65\bin\java" -Didea.launcher.port=7533 "-Didea.launcher.bin.path=E:\Smart IDEA\IntelliJ IDEA Community Edition 2016.2.4\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\charsets.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\deploy.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\access-bridge-32.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\cldrdata.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\dnsns.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\jaccess.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\jfxrt.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\localedata.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\nashorn.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\sunec.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\sunjce_provider.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\sunmscapi.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\sunpkcs11.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\ext\zipfs.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\javaws.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\jce.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\jfr.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\jfxswt.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\jsse.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\management-agent.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\plugin.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\resources.jar;C:\Program Files (x86)\Java\jdk1.8.0_65\jre\lib\rt.jar;E:\HelloScala\out\production\HelloScala;D:\SOFTWARE\scala\lib\scala-actors-migration.jar;D:\SOFTWARE\scala\lib\scala-actors.jar;D:\SOFTWARE\scala\lib\scala-library.jar;D:\SOFTWARE\scala\lib\scala-reflect.jar;D:\SOFTWARE\scala\lib\scala-swing.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\scala-library.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-actor-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-actor-migration-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-agent-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-beanstalk-mailbox-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-file-mailbox-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-kernel-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-mailboxes-common-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-mongo-mailbox-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-redis-mailbox-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-remote-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-sample-fsm-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-sample-hello-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-sample-hello-kernel-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-sample-remote-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-slf4j-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-testkit-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-transactor-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-zeromq-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\akka-zookeeper-mailbox-2.0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\beanstalk_client-1.4.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\bson-driver_2.9.0-1-0.2.9-1.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\commons-codec-1.4.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\commons-io-1.4.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\commons-io-2.0.1.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\commons-logging-1.1.1.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\commons-pool-1.5.6.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\config-0.3.1.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\dispatch-json_2.9.1-0.8.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\h2-lzf-1.0.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\httpclient-4.1.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\httpcore-4.1.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\jna-3.0.9.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\jnr-constants-0.8.2.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\log4j-1.2.14.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\mongo-driver_2.9.0-1-0.2.9-1.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\mongo-java-driver-2.7.1.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\netty-3.5.4.Final.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\objenesis-1.2.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\protobuf-java-2.4.1.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\redisclient_2.9.1-2.4.0.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\scala-stm_2.9.1-0.5.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\scalaj-collection_2.9.0-1-1.2.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\sjson_2.9.1-0.15.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\slf4j-api-1.6.4.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\util-core-1.12.2.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\zeromq-scala-binding_2.9.1-0.0.6.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\zkclient-0.3.jar;D:\SOFTWARE\akka\akka-2.0.5\lib\akka\zookeeper-3.4.0.jar;E:\Smart IDEA\IntelliJ IDEA Community Edition 2016.2.4\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain akka.dt.app.scala.messages.HelloAkka Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Lscala/collection/immutable/StringOps; at akka.util.Duration$.(Duration.scala:76) at akka.util.Duration$. (Duration.scala) at akka.actor.ActorSystem$Settings. (ActorSystem.scala:140) at akka.actor.ActorSystemImpl. (ActorSystem.scala:449) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at akka.actor.ActorSystem$.apply(ActorSystem.scala:93) at akka.actor.ActorSystem$.create(ActorSystem.scala:56) at akka.dt.app.scala.messages.HelloAkka$.main(HelloAkka.scala:14) at akka.dt.app.scala.messages.HelloAkka.main(HelloAkka.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Process finished with exit code 1