频道栏目
首页 > 资讯 > 其他综合 > 正文

Storm基础

16-11-23        来源:[db:作者]  
收藏   我要投稿

想要学习Storm相关的知识的话,那一定是对Hadoop的基础知识有了一定的了解(啰嗦一下,至少知道MapReduce的模型是什么概念,具体的处理过程是什么样的,不需要有多深的理解,至少不会出现相应的误解)

废话不多说了,我们先说说Storm我们用来都能干什么吧,其实说白了,最简单的一个应用的场景就是可以实时的给用户推送相应的内容,而这些内容都是实时采集的内容,就是可以实时的统计出用户提交的信息,并展示用户的信息。

spout bolts Topology

Storm的实时处理任务被打包成Topology发布,同hadoop的MapReduce任务相似(Job)但是不同的是一旦提交的话就不会停止处理完,直到手动的终止这个topology才会停止。

spout:storm的消息源,为topology提供数据;
bolts:storm的消息处理者,为topology进行消息处理;
toplolgy:处理的任务实体;

这里写图片描述

作为一个storm的门的小白,知道其组成都由什么构成对于后续的处理过程的理解是有好处的,那么接下来我们就从代码的角度理解Storm,我深知学习和了解一门新知识没有什么比代码更具有说服力的啦!!

实现一个简单的Storm(本地运行)的代码:
代码由四部分构成:
bolts,count,main和spout,只要完成一个实时的单词计数程序!

spouts文件的内容:
最主要的是nextTuple函数,open函数,和declareOutputFields函数,

在nextTuple函数中,向bolts中分发相应的文本行,nextTuple会在ack和fail中被周期性的调用;核心的是emit函数的提交,提交的是键值对,作为元祖发送出去;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。它接收如下参数:配置对象,在定义topology对象是创建;TopologyContext对象,包含所有拓扑数据;还有SpoutOutputCollector对象,它能让我们发布交给bolts处理的数据

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Values;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.InterruptedIOException;
import java.util.Map;

/**
 * spouts定义了输入的数据类型和处理过程,实现WordReader类实现了IRichSpout接口,负责从文件按行读取文件
 * 并把文本行给bolts
 */
public class spouts implements IRichSpout{
        private SpoutOutputCollector spoutOutputCollector;
        private FileReader fileReader;
        private boolean complateBoolean = false;
        private TopologyContext topologyContext;
    public void ack(Object msgID){
        System.out.println("OK:"+msgID);
    }
    public void close(){}



    public void fail(Object msgID){
        System.out.println("FAIL:"+msgID);
    }
    /**
     * 实现的过程作的唯一的过程,分发文件中的文本行,向bolts发送待处理的数据,逐行读取数据并进行处理
     * nextTuple会在ack()和fall()的函数内被周期性的调用,没有任务的时候释放对该线程的控制,其他的方法采用机会被执行
     * 因此程序的第一行执行的是检查其是否已经被处理完成,如果处理完成就降低处理器的负载,会在返回之前休眠1s,如果任务完成了,文件中的每一行都会被分发出来了
     *
     */
    public void nextTuple(){
        /**
         *一直调用的程序,直到整个程序执行完
         */
        if(complateBoolean){
            try{
                Thread.sleep(1000);
            }catch (Exception e){
                throw new RuntimeException("Error! Time out!");
            }
            return;
        }
        String string;
        BufferedReader bufferedReader = new BufferedReader(fileReader);
        try{
            while((string = bufferedReader.readLine())!= null){
                this.spoutOutputCollector.emit(new Values(string),string);
            }
        }catch (Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally {
            complateBoolean = true;
        }
    }
    /**
     * 我们将创建一个文件,并维持collector对象
     * open方法将会被第一个调用
     * 接受的参数如下:
     * conf:配置对象,在定义topology对象创建
     * context:包含所有的拓扑数据
     * spoutOutputCollector:发布交给bolts处理的数据
     */
    public void open(Map conf,TopologyContext context,SpoutOutputCollector spoutOutputCollector){
        try{
            this.topologyContext = context;
            this.fileReader = new FileReader(conf.get("wordsFile").toString());//用来读取文件
        }catch(FileNotFoundException e){
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.spoutOutputCollector= spoutOutputCollector;
    }

    @Override
    public boolean isDistributed() {
        return false;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

bolts文件的内容:

declareOutputFields函数,定义一个域,声明bolt的输出模式;execute处理输入的单个元祖;
perpare在bolt开始处理元祖之前调用;
cleanup关闭时调用;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * spouts读取数据,按行读取数据,每行发布一个元祖,还要创建bolts过程;
 * bolts实现了接口IRichBolt接口
 * bolts执行execute(Tuple input),每一次接收元祖都会调用一次,当执行nextTuple或execute方法时
 *
 * 整个bolt执行的是WordNormallzer:负责得到并标准化每行文本,把文件切分成单词,大写转成小写,去掉头尾的空白符
 */
public class bolts implements IRichBolt{
    private OutputCollector outputCollector;
    /**
     * bolt发布一个名为"word"的域
     * @param declarer
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("word"));
    }



    /**
     * 处理传入的元祖
     * @param input
     */
    @Override
    public void execute(Tuple input){
        String sentence=input.getString(0);
        String[] words=sentence.split(" ");
        for(String word:words){
            word.trim();
            if(!word.isEmpty()){
                word=word.toLowerCase();
                List a=new ArrayList();
                a.add(input);
                //发布这个单词
                outputCollector.emit(a,new Values(word));
            }
        }
        outputCollector.ack(input);
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void prepare(Map conf, TopologyContext context,OutputCollector outputCollector){
        this.outputCollector = outputCollector;
    }

}

count文件:

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
 * 负责计数的bolts,结果显示
 */
public class count implements IRichBolt{
    Integer id;
    String name;
    Map counter;
    private OutputCollector outputCollector;

    @Override
    public void cleanup(){
        System.out.println("--单词数量:"+name+"-"+id+"--");
        for(Map.Entry entry:counter.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }




    /**
     *    单词计数
     */
    @Override
    public void execute(Tuple tuple){
        String string=tuple.getString(0);
        if(!counter.containsKey(string)){
            counter.put(string,1);
        }else{
            Integer c=counter.get(string)+1;
            counter.put(string,c);
        }
        outputCollector.ack(tuple);
    }
    @Override
    public void prepare(Map conf, TopologyContext context,OutputCollector outputCollector){
        this.counter=new HashMap();
        this.outputCollector=outputCollector;
        this.name=context.getThisComponentId();
        this.id=context.getThisTaskId();
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer){}




}

Main文件:

public class Main {
    public static void main(String[] args) throws Exception{
        TopologyBuilder topologyBuilder=new TopologyBuilder();
        topologyBuilder.setSpout("word-reader",new spouts());
        topologyBuilder.setBolt("word-normal",new bolts()).shuffleGrouping("word-reader");
        topologyBuilder.setBolt("word-count",new count(),2).fieldsGrouping("word-normal",new Fields("word"));

        Config config=new Config();
        String path="../src/main/resources/words.txt";
        config.put("wordFile",path);
        config.setDebug(true);
        //config.setNumWorkers(2);

        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
        LocalCluster localCluster=new LocalCluster();
        //localCluster.shutdown();
        localCluster.submitTopology("Get-start-topology",config,topologyBuilder.createTopology());//提示有错误
        Thread.sleep(1000);
        localCluster.killTopology("Get-start-topology");
        localCluster.shutdown();
    }
}
相关TAG标签
上一篇:uboot编译之体验
下一篇:pyhton概览加数据类型
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站