前面几篇文章已经介绍了如何搭建storm集群。接下来学习如何编写storm代码,使用maven构建,本地模拟集群测试代码
使用maven来配置需要的jar包,只需要一个0.9.2版本的 storm即可
org.apache.storm storm-core 0.9.2-incubating
创建sentenceSpout作为storm的数据源,需要继承 BaseRichSpout ,它需要重写几个方法
/* spout初始化时调用这个方法 map包含storm配置信息 TopologyContext对象提供了topology中组件的信息 SpoutOutputCollector对象提供了发射tuple的方法 */ public void open(Map config, TopologyContext context, SpoutOutputCollector collector){ this.collector=collector; }
open方法是spout的初始化时会调用的方法,传入该方法的SpoutOutputCollector对象提供了向外发射数据流的方法
/* 声明spout会发射一个数据流,其中的tuple包含一个字段sentence */ public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("sentence")); };
declareOutputFields方法声明了该spout会发射一个数据流,该数据流包含一个key,名称为 sentence
/* Storm通过调用这个方法向输出的collector发射tuple */ public void nextTuple(){ if(index < sentences.length){ this.collector.emit(new Values(sentences[index])); index++; } else{ index=0; //如果不让index归0,sentences只会发送一次 } Utils.waitForMillis(1); }
storm会调用spout的nextTuple,因此我们需要在nextTuple中使用collector.emit()方法向外发射数据流。
完成spout如下:
import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import java.util.Map; /** * Created by chenhong on 16/1/25. */ public class SentenceSpout extends BaseRichSpout{ private SpoutOutputCollector collector; private String[] sentences={ "my dog has fleas", "i like cold beverages", "the dog ate my homework", "don't hava a cow man ", "i don't think i like fleas" }; private int index=0; /* 声明spout会发射一个数据流,其中的tuple包含一个字段sentence */ public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("sentence")); }; /* spout初始化时调用这个方法 map包含storm配置信息 TopologyContext对象提供了topology中组件的信息 SpoutOutputCollector对象提供了发射tuple的方法 */ public void open(Map config, TopologyContext context, SpoutOutputCollector collector){ this.collector=collector; } /* Storm通过调用这个方法向输出的collector发射tuple */ public void nextTuple(){ if(index < sentences.length){ this.collector.emit(new Values(sentences[index])); index++; } else{ index=0; //如果不让index归0,sentences只会发送一次 } Utils.waitForMillis(1); } }
创建SplitSentenceBolt,对spout发射的句子按空格分隔成单词,需要继承BaseRichBolt,它需要重写一下几个方法:
/* bolt初始化时调用 */ public void prepare(Map config ,TopologyContext context,OutputCollector collector){ this.collector = collector; }
prepare在bolt初始化时会被调用,类似spout的open方法,OutputCollector用来向外发射一个数据流
/* 声明每个tuple包含一个字段 word */ public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("word")); }
declareOutputFields表示该bolt会向外发射一个数据流,该数据流包含一个key,key的名称为 word
/* 每当从订阅的数据流中接收一个tuple,都会调用这个方法 */ public void execute(Tuple tuple){ String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for(String word : words){ this.collector.emit(new Values(word)); } }
该方法会在bolt接收到上游发射的数据流后被调用
完成代码如下:
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.Map; /** * Created by chenhong on 16/1/25. */ public class SplitSentenceBolt extends BaseRichBolt{ private OutputCollector collector; /* bolt初始化时调用 */ public void prepare(Map config ,TopologyContext context,OutputCollector collector){ this.collector = collector; } /* 声明每个 tuple包含一个字段 word */ public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("word")); } /* 每当从订阅的数据流中接收一个tuple,都会调用这个方法 */ public void execute(Tuple tuple){ String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for(String word : words){ this.collector.emit(new Values(word)); } } }
WordCountBolt用来对SplitSentenceBolt发射的单词进行计数,WordCountBolt需要实现的方法与SplitSentenceBolt类似,因此不再对方法多做解释。完成代码如下:
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * Created by chenhong on 16/1/25. */ public class WordCountBolt extends BaseRichBolt{ private OutputCollector collector; private HashMapcounts = null; public void prepare(Map config , TopologyContext context,OutputCollector collector){ this.collector = collector; this.counts = new HashMap (); } public void execute(Tuple tuple){ String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if(count ==null){ count =0L; } count++; this.counts.put(word,count); this.collector.emit(new Values(word,count)); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word","count")); } }
ReportBolt作为最后一个Bolt,它的作用是对计数完的单词进行打印或持久化到数据库。因此无需再向外发射数据流。因此它的declareOutputFields不需要声明declare
/* 该bolt位于末端,所以declareOutputFields为空 **/ public void declareOutputFields(OutputFieldsDeclarer declarer){ }
在该计数完成后,将计数结果打印,因此需要重写cleanup方法
/* cleanup方法用来释放bolt占用的资源 */ public void cleanup(){ System.out.println("--- FINAL COUNTS ---"); Listkeys = new ArrayList (); keys.addAll(this.counts.keySet()); Collections.sort(keys); for(String key: keys){ System.out.println(key+" : "+this.counts.get(key)); } }
完成代码如下:
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import java.util.*; /** * Created by chenhong on 16/1/25. */ public class ReportBolt extends BaseRichBolt { private HashMapcounts =null; public void prepare(Map config, TopologyContext context, OutputCollector collector){ this.counts = new HashMap (); } public void execute(Tuple tuple){ String word = tuple.getStringByField("word"); Long count = tuple.getLongByField("count"); this.counts.put(word,count); } /* 该bolt位于末端,所以declareOutputFields为空 **/ public void declareOutputFields(OutputFieldsDeclarer declarer){ } /* cleanup方法用来释放bolt占用的资源 */ public void cleanup(){ System.out.println("--- FINAL COUNTS ---"); List keys = new ArrayList (); keys.addAll(this.counts.keySet()); Collections.sort(keys); for(String key: keys){ System.out.println(key+" : "+this.counts.get(key)); } } }
现在已经实现了一个 SentenceSpout, SplitSentenceBolt,WordCountBolt,ReportBolt。 我们需要将SentenceSpout作为数据源,将句子发射给SplitSentenceBolt, SplitSentenceBolt将接收到的句子按照空格进行分割成单词发射给WordCountBolt,WordCountBolt对单词进行计数,并将计数结果发送给ReportBolt,注意WordCountBolt的计数结果随着程序进行随时变化,因此ReportBolt中的计数结果也随时更新。在程序结束后,将ReportBolt记录的单词和计数打印出来。完成代码如下:
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; /** * Created by chenhong on 16/1/25. */ public class WordCountTopology { private static final String SENTENCE_SPOUT_ID="sentence-spout"; private static final String SPILL_BOLT_ID ="split-bolt"; private static final String COUNT_BOLT_ID ="count-bolt"; private static final String REPORT_BOLT_ID="report-bolt"; private static final String TOPOLOGY_NAME="word-count-topology"; public static void main(String[] args){ SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt splitBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); TopologyBuilder builder = new TopologyBuilder(); //注册一个sentence spout并且赋值给其唯一的ID builder.setSpout(SENTENCE_SPOUT_ID, spout); //注册一个splitsentencebolt ,这个bolt订阅sentencespout发射出来的数据流,shuffleGrouping方法告诉 //storm要将类sentenceSpout发射的tuple随机均匀地分发给SplitSentenceBolt实例 builder.setBolt(SPILL_BOLT_ID,splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID); //fieldsGrouping()方法来保证所有 word字段值相同的tuple会被路由到同一个wordcountbolt实例中 builder.setBolt(COUNT_BOLT_ID,countBolt).fieldsGrouping(SPILL_BOLT_ID, new Fields("word")); //globalGrouping方法将WordCountBolt发射的所有tuple路由到唯一的ReportBolt任务中 builder.setBolt(REPORT_BOLT_ID,reportBolt).globalGrouping(COUNT_BOLT_ID); //config对象代表了对topology所有组件全局生效的配置参数集合,会分发给各个spout和bolt的open(),prepare()方法 Config config = new Config(); //LocalCluster类在本地开发环境来模拟一个完整的storm集群 LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME,config,builder.createTopology()); Utils.waitForSeconds(5); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }
注意单词计数的结果与机器性能有关
a : 432 ate : 432 beverages : 432 cold : 432 cow : 432 dog : 864 don't : 864 fleas : 864 has : 432 hava : 432 homework : 432 i : 1296 like : 864 man : 432 my : 864 the : 432 think : 432