核心内容:
1、Akka第一个案例动手实战MapActor、ReduceActor、AggregateActor代码详解
1、Akka第一个案例动手实战MapActor、ReduceActor、AggregateActor代码详解 |
1>MapActor的职责在于对传进来的字符串进行单词的解析并将每个单词计数为1;ReduceActor的职责类似与MapReduce中的
combiner;AggregateActor对数据进行全局的统计计算。
2>通过使用instanceOf方法可以对消息的类型进行模式匹配。
3>MapActor、ReduceActor、AggregateActor都继承了UntypedActor,并使用onReceive方法接受消息并对消息进行解析,解析之后进行
路由。
相应代码:
MapActor:
package akka.dt.app.java.actors; import akka.actor.ActorRef; import akka.actor.UntypedActor; import akka.dt.app.java.messages.MapData; import akka.dt.app.java.messages.WordCount; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.StringTokenizer; public class MapActor extends UntypedActor //继承抽象类,并实现里面的抽象方法 { private ActorRef reduceActor = null; public MapActor(ActorRef reduceActor) //属性与相应的构造方法 { this.reduceActor = reduceActor; } //这种写法在scala当中我见到过:学习点1 String[] STOP_WORDS = {"a","is"}; ListSTOP_WORDS_LIST = Arrays.asList(STOP_WORDS); @Override public void onReceive(Object message) throws Exception //实现UntypedActor里面的抽象方法 { //学习点2:判断某个数据是否是某种类型 if(message instanceof String) //判断消息message是否是String类型的 { String work = (String) message; //拆分句子中的单词 MapData data = evaluateExpression(work); //这个方法是自己创建的 //将MapActor处理的结果MapData发送给ReduceActor reduceActor.tell(data); //将处理的结果即mapdata发送给我们的reduceActor } else unhandled(message); //如果传进来的不是字符串就不处理了 } public MapData evaluateExpression(String line) //这个方法返回的就是MapData { List dataList = new ArrayList (); //学习点3 StringTokenizer parser = new StringTokenizer(line); while(parser.hasMoreTokens()) //学习点4 { String word = parser.nextToken().toLowerCase(); //学习点5 if(! STOP_WORDS_LIST.contains(word)) //先判断一下这个单词是否在动态数组当中 { //将单词以WordCount实体类型的方式写到dataList中 dataList.add(new WordCount(word,Integer.valueOf(1))); //学习点6 } } return new MapData(dataList); //最后将产生的结果传送给我们的MapData } }
ReduceActor:
package akka.dt.app.java.actors; import akka.actor.ActorRef; import akka.actor.UntypedActor; import akka.dt.app.java.messages.MapData; import akka.dt.app.java.messages.ReduceData; import akka.dt.app.java.messages.WordCount; import java.util.HashMap; import java.util.List; public class ReduceActor extends UntypedActor { private ActorRef aggregateActor = null; public ReduceActor(ActorRef inAggregateActor) { aggregateActor = inAggregateActor; } @Override public void onReceive(Object message) throws Exception { if (message instanceof MapData) { MapData mapData = (MapData) message; //reduce the incoming data ReduceData reduceData = reduce(mapData.getDataList()); //forward the result to aggregate actor aggregateActor.tell(reduceData); }else unhandled(message ); } private ReduceData reduce(ListdataList) //WordCount里面存储的是 { HashMap reducedMap = new HashMap (); for (WordCount wordCount:dataList) { if (reducedMap.containsKey(wordCount.getWord())) { Integer value = (Integer) reducedMap.get(wordCount.getWord()); value ++; reducedMap.put(wordCount.getWord(),value); } else { reducedMap.put(wordCount.getWord(),Integer.valueOf(1)); } } return new ReduceData(reducedMap); } }
AggregateActor:
package akka.dt.app.java.actors; import akka.actor.UntypedActor; import akka.dt.app.java.messages.ReduceData; import akka.dt.app.java.messages.Result; import java.util.HashMap; import java.util.Map; public class AggregateActor extends UntypedActor //三个Actor都是通过UntypedActor的方式创建的 { public MapfinalReduceMap = new HashMap (); @Override public void onReceive(Object message) throws Exception { //不断循环自己的邮箱,并根据消息的类型来进行模式匹配 if(message instanceof ReduceData) //判断接受消息的类型 { ReduceData reduceData = (ReduceData)message; aggregateInMemoryReduce(reduceData.getReduceDataList()); }else if(message instanceof Result) { //学习点6 System.out.println(finalReduceMap.toString()); }else unhandled(message); } public void aggregateInMemoryReduce(Map reducedList) { Integer count = null; for (String key:reducedList.keySet()) { if(finalReduceMap.containsKey(key)) { count = reducedList.get(key) + finalReduceMap.get(key); finalReduceMap.put(key,count); }else { finalReduceMap.put(key,reducedList.get(key)); } } } }
如有问题,欢迎留言指正!