频道栏目
首页 > 程序开发 > 软件开发 > 其他 > 正文
大数据系列修炼:Scala课程94
2016-10-18 09:13:00      个评论    来源:数据分析玩家  
收藏   我要投稿

核心内容:

1、Akka第一个案例动手实战MapActor、ReduceActor、AggregateActor代码详解


1、Akka第一个案例动手实战MapActor、ReduceActor、AggregateActor代码详解

1>MapActor的职责在于对传进来的字符串进行单词的解析并将每个单词计数为1;ReduceActor的职责类似与MapReduce中的
combiner;AggregateActor对数据进行全局的统计计算。
2>通过使用instanceOf方法可以对消息的类型进行模式匹配。
3>MapActor、ReduceActor、AggregateActor都继承了UntypedActor,并使用onReceive方法接受消息并对消息进行解析,解析之后进行
路由。


相应代码:
MapActor:

package akka.dt.app.java.actors;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.dt.app.java.messages.MapData;
import akka.dt.app.java.messages.WordCount;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.StringTokenizer;


public class MapActor extends UntypedActor  //继承抽象类,并实现里面的抽象方法
{
    private ActorRef reduceActor = null;
    public MapActor(ActorRef reduceActor)  //属性与相应的构造方法
    {
        this.reduceActor = reduceActor;
    }
    //这种写法在scala当中我见到过:学习点1
    String[] STOP_WORDS = {"a","is"};
    List  STOP_WORDS_LIST =   Arrays.asList(STOP_WORDS);
    @Override
    public void onReceive(Object message) throws Exception   //实现UntypedActor里面的抽象方法
    {
        //学习点2:判断某个数据是否是某种类型
        if(message instanceof String)  //判断消息message是否是String类型的
        {
            String work = (String) message;
            //拆分句子中的单词
            MapData data = evaluateExpression(work);  //这个方法是自己创建的
            //将MapActor处理的结果MapData发送给ReduceActor
            reduceActor.tell(data);  //将处理的结果即mapdata发送给我们的reduceActor
        }
        else unhandled(message); //如果传进来的不是字符串就不处理了
    }

    public MapData evaluateExpression(String line)   //这个方法返回的就是MapData
    {
        List dataList = new ArrayList();
        //学习点3
        StringTokenizer parser = new StringTokenizer(line);
        while(parser.hasMoreTokens()) //学习点4
        {
            String word = parser.nextToken().toLowerCase(); //学习点5
            if(!   STOP_WORDS_LIST.contains(word)) //先判断一下这个单词是否在动态数组当中
            {
                //将单词以WordCount实体类型的方式写到dataList中
                dataList.add(new WordCount(word,Integer.valueOf(1))); //学习点6
            }
        }
        return new MapData(dataList);  //最后将产生的结果传送给我们的MapData
    }


}

ReduceActor:

package akka.dt.app.java.actors;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.dt.app.java.messages.MapData;
import akka.dt.app.java.messages.ReduceData;
import akka.dt.app.java.messages.WordCount;

import java.util.HashMap;
import java.util.List;


public class ReduceActor extends UntypedActor
{
    private ActorRef aggregateActor = null;
    public ReduceActor(ActorRef inAggregateActor)
    {
        aggregateActor = inAggregateActor;
    }
    @Override
    public void onReceive(Object message) throws  Exception
    {
        if (message instanceof MapData)
        {
            MapData mapData = (MapData) message;
            //reduce the incoming data
            ReduceData reduceData = reduce(mapData.getDataList());
            //forward the result to aggregate actor
            aggregateActor.tell(reduceData);
        }else
            unhandled(message );
    }
    private  ReduceData reduce(List dataList)  //WordCount里面存储的是
    {
        HashMap reducedMap = new HashMap();
        for (WordCount wordCount:dataList)
        {
            if (reducedMap.containsKey(wordCount.getWord()))
            {
                Integer value = (Integer) reducedMap.get(wordCount.getWord());
                value ++;
                reducedMap.put(wordCount.getWord(),value);
            } else
            {
                reducedMap.put(wordCount.getWord(),Integer.valueOf(1));
            }
        }
        return new ReduceData(reducedMap);
    }
}

AggregateActor:

package akka.dt.app.java.actors;

import akka.actor.UntypedActor;
import akka.dt.app.java.messages.ReduceData;
import akka.dt.app.java.messages.Result;

import java.util.HashMap;
import java.util.Map;


public class AggregateActor extends UntypedActor   //三个Actor都是通过UntypedActor的方式创建的
{
    public Map finalReduceMap = new HashMap();
    @Override
    public void onReceive(Object message) throws Exception
    {
       //不断循环自己的邮箱,并根据消息的类型来进行模式匹配
        if(message instanceof ReduceData)  //判断接受消息的类型
       {
           ReduceData reduceData = (ReduceData)message;
           aggregateInMemoryReduce(reduceData.getReduceDataList());
       }else if(message instanceof Result)
       {
           //学习点6
           System.out.println(finalReduceMap.toString());
       }else
           unhandled(message);
    }
    public void aggregateInMemoryReduce(Map reducedList)
    {
        Integer count = null;
        for (String key:reducedList.keySet())
        {
            if(finalReduceMap.containsKey(key))
            {
                count = reducedList.get(key) + finalReduceMap.get(key);
                finalReduceMap.put(key,count);
            }else
            {
                finalReduceMap.put(key,reducedList.get(key));
            }
        }
    }
}

如有问题,欢迎留言指正!

点击复制链接 与好友分享!回本站首页
上一篇:Redis有序集合命令ZREVRANGEBYSCORE详解与应用
下一篇:大数据系列修炼:Scala课程95
相关文章
图文推荐
点击排行

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站