频道栏目
首页 > 资讯 > 云计算 > 正文

storm学习(1)word_count程序

16-08-17        来源:[db:作者]  
收藏   我要投稿

准备

前面几篇文章已经介绍了如何搭建storm集群。接下来学习如何编写storm代码,使用maven构建,本地模拟集群测试代码

编码

maven 配置

使用maven来配置需要的jar包,只需要一个0.9.2版本的 storm即可

   
        
            org.apache.storm
            storm-core
            0.9.2-incubating
        
    

创建SentenceSpout

创建sentenceSpout作为storm的数据源,需要继承 BaseRichSpout ,它需要重写几个方法

     /*
    spout初始化时调用这个方法
    map包含storm配置信息
    TopologyContext对象提供了topology中组件的信息
    SpoutOutputCollector对象提供了发射tuple的方法
     */
    public void open(Map config, TopologyContext context, SpoutOutputCollector collector){
         this.collector=collector;
    }

open方法是spout的初始化时会调用的方法,传入该方法的SpoutOutputCollector对象提供了向外发射数据流的方法

 /*
     声明spout会发射一个数据流,其中的tuple包含一个字段sentence
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("sentence"));

    };

declareOutputFields方法声明了该spout会发射一个数据流,该数据流包含一个key,名称为 sentence

   /*
     Storm通过调用这个方法向输出的collector发射tuple
    */
    public void nextTuple(){

        if(index < sentences.length){
            this.collector.emit(new Values(sentences[index]));
            index++;
        }
        else{
            index=0; //如果不让index归0,sentences只会发送一次
        }

        Utils.waitForMillis(1);
    }

storm会调用spout的nextTuple,因此我们需要在nextTuple中使用collector.emit()方法向外发射数据流。
完成spout如下:

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;

/**
 * Created by chenhong on 16/1/25.
 */
public class SentenceSpout extends BaseRichSpout{

    private SpoutOutputCollector collector;

    private String[] sentences={
            "my dog has fleas",
            "i like cold beverages",
            "the dog ate my homework",
            "don't hava a cow man ",
            "i don't think i like fleas"
    };

    private int index=0;


    /*
     声明spout会发射一个数据流,其中的tuple包含一个字段sentence
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("sentence"));

    };
    /*
    spout初始化时调用这个方法
    map包含storm配置信息
    TopologyContext对象提供了topology中组件的信息
    SpoutOutputCollector对象提供了发射tuple的方法
     */
    public void open(Map config, TopologyContext context, SpoutOutputCollector collector){
         this.collector=collector;
    }

    /*
     Storm通过调用这个方法向输出的collector发射tuple
    */
    public void nextTuple(){

        if(index < sentences.length){
            this.collector.emit(new Values(sentences[index]));
            index++;
        }
        else{
            index=0; //如果不让index归0,sentences只会发送一次
        }

        Utils.waitForMillis(1);
    }
}

创建SplitSentenceBolt

创建SplitSentenceBolt,对spout发射的句子按空格分隔成单词,需要继承BaseRichBolt,它需要重写一下几个方法:

     /*
     bolt初始化时调用
     */
    public void prepare(Map config ,TopologyContext context,OutputCollector collector){
        this.collector = collector;
    }

prepare在bolt初始化时会被调用,类似spout的open方法,OutputCollector用来向外发射一个数据流

    /*
     声明每个tuple包含一个字段 word
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("word"));
    }

declareOutputFields表示该bolt会向外发射一个数据流,该数据流包含一个key,key的名称为 word

  /*
     每当从订阅的数据流中接收一个tuple,都会调用这个方法
     */
    public void execute(Tuple tuple){
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            this.collector.emit(new Values(word));
        }
    }

该方法会在bolt接收到上游发射的数据流后被调用
完成代码如下:

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

/**
 * Created by chenhong on 16/1/25.
 */
public class SplitSentenceBolt extends BaseRichBolt{

    private OutputCollector collector;

    /*
     bolt初始化时调用
     */
    public void prepare(Map config ,TopologyContext context,OutputCollector collector){
        this.collector = collector;
    }

    /*
     声明每个 tuple包含一个字段 word
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("word"));
    }

    /*
     每当从订阅的数据流中接收一个tuple,都会调用这个方法
     */
    public void execute(Tuple tuple){
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            this.collector.emit(new Values(word));
        }
    }
}

创建WordCountBolt

WordCountBolt用来对SplitSentenceBolt发射的单词进行计数,WordCountBolt需要实现的方法与SplitSentenceBolt类似,因此不再对方法多做解释。完成代码如下:

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by chenhong on 16/1/25.
 */
public class WordCountBolt extends BaseRichBolt{
    private OutputCollector collector;

    private HashMap counts = null;

    public void prepare(Map config , TopologyContext context,OutputCollector collector){
        this.collector = collector;
        this.counts = new HashMap();
    }



    public void execute(Tuple tuple){
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if(count ==null){
            count =0L;
        }
        count++;
        this.counts.put(word,count);
        this.collector.emit(new Values(word,count));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word","count"));
    }
}

创建ReportBolt

ReportBolt作为最后一个Bolt,它的作用是对计数完的单词进行打印或持久化到数据库。因此无需再向外发射数据流。因此它的declareOutputFields不需要声明declare

      /*
     该bolt位于末端,所以declareOutputFields为空
    **/
    public void declareOutputFields(OutputFieldsDeclarer declarer){

    }

在该计数完成后,将计数结果打印,因此需要重写cleanup方法

      /*
     cleanup方法用来释放bolt占用的资源
     */
    public void cleanup(){
        System.out.println("--- FINAL COUNTS ---");
        List keys = new ArrayList();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for(String key: keys){
            System.out.println(key+" : "+this.counts.get(key));

        }
    }

完成代码如下:

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.*;

/**
 * Created by chenhong on 16/1/25.
 */
public class ReportBolt extends BaseRichBolt {
    private HashMap counts =null;

    public void prepare(Map config, TopologyContext context, OutputCollector collector){
        this.counts = new HashMap();
    }

    public void execute(Tuple tuple){
        String word = tuple.getStringByField("word");
        Long count = tuple.getLongByField("count");
        this.counts.put(word,count);

    }

    /*
     该bolt位于末端,所以declareOutputFields为空
    **/
    public void declareOutputFields(OutputFieldsDeclarer declarer){

    }


    /*
     cleanup方法用来释放bolt占用的资源
     */
    public void cleanup(){
        System.out.println("--- FINAL COUNTS ---");
        List keys = new ArrayList();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for(String key: keys){
            System.out.println(key+" : "+this.counts.get(key));

        }
    }
}

创建WordCountTopology

现在已经实现了一个 SentenceSpout, SplitSentenceBolt,WordCountBolt,ReportBolt。 我们需要将SentenceSpout作为数据源,将句子发射给SplitSentenceBolt, SplitSentenceBolt将接收到的句子按照空格进行分割成单词发射给WordCountBolt,WordCountBolt对单词进行计数,并将计数结果发送给ReportBolt,注意WordCountBolt的计数结果随着程序进行随时变化,因此ReportBolt中的计数结果也随时更新。在程序结束后,将ReportBolt记录的单词和计数打印出来。完成代码如下:

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * Created by chenhong on 16/1/25.
 */
public class WordCountTopology {

    private static final String SENTENCE_SPOUT_ID="sentence-spout";
    private static final String SPILL_BOLT_ID ="split-bolt";
    private static final String COUNT_BOLT_ID ="count-bolt";
    private static final String REPORT_BOLT_ID="report-bolt";
    private static final String TOPOLOGY_NAME="word-count-topology";

    public static void main(String[] args){

        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt splitBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        TopologyBuilder builder = new TopologyBuilder();

        //注册一个sentence spout并且赋值给其唯一的ID
        builder.setSpout(SENTENCE_SPOUT_ID, spout);
        //注册一个splitsentencebolt ,这个bolt订阅sentencespout发射出来的数据流,shuffleGrouping方法告诉
        //storm要将类sentenceSpout发射的tuple随机均匀地分发给SplitSentenceBolt实例
        builder.setBolt(SPILL_BOLT_ID,splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
        //fieldsGrouping()方法来保证所有 word字段值相同的tuple会被路由到同一个wordcountbolt实例中
        builder.setBolt(COUNT_BOLT_ID,countBolt).fieldsGrouping(SPILL_BOLT_ID, new Fields("word"));
        //globalGrouping方法将WordCountBolt发射的所有tuple路由到唯一的ReportBolt任务中
        builder.setBolt(REPORT_BOLT_ID,reportBolt).globalGrouping(COUNT_BOLT_ID);

        //config对象代表了对topology所有组件全局生效的配置参数集合,会分发给各个spout和bolt的open(),prepare()方法
        Config config = new Config();
        //LocalCluster类在本地开发环境来模拟一个完整的storm集群
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGY_NAME,config,builder.createTopology());
        Utils.waitForSeconds(5);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

运行结果

注意单词计数的结果与机器性能有关

a : 432
ate : 432
beverages : 432
cold : 432
cow : 432
dog : 864
don't : 864
fleas : 864
has : 432
hava : 432
homework : 432
i : 1296
like : 864
man : 432
my : 864
the : 432
think : 432 
相关TAG标签
上一篇:MySQL数据库开发
下一篇:图解微软ReportBuilder3连接SqlServer2008数据库表创建报表
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站