首页 > 程序开发 > 综合编程 > 其他综合 > 正文
消息中间件-activemq实战之消息持久化(六)
2017-05-17       个评论    来源:rickiyang的博客  
收藏    我要投稿

消息中间件-activemq实战之消息持久化。对于activemq消息的持久化我们在第二节的时候就简单介绍过,今天我们详细的来分析一下activemq的持久化过程以及持久化插件。在生产环境中为确保消息的可靠性,我们肯定的面临持久化消息的问题,今天就一起来攻克他吧。

1. 持久化方式介绍

前面我们也简单提到了activemq提供的插件式的消息存储,在这里再提一下,主要有以下几种方式:

AMQ消息存储-基于文件的存储方式,是activemq开始的版本默认的消息存储方式; KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式; JDBC消息存储-消息基于JDBC存储的; Memory消息存储-基于内存的消息存储,由于内存不属于持久化范畴,而且如果使用内存队列,可以考虑使用更合适的产品,如ZeroMQ。所以内存存储不在讨论范围内。

上面几种消息存储方式对于消息存储的逻辑来说并没有什么区别,只是在性能以及存储方式上来说有所不同。但是对于消息发送的方式来说,p2p和Pub/Sub两种类型的消息他们的持久化方式却是不同的:

对于点对点的消息一旦消费者完成消费这条消息将从broker上删除;对于发布订阅类型的消息,即使所有的订阅者都完成了消费,Broker也不一定会马上删除无用消息,而是保留推送历史,之后会异步清除无用消息。而每个订阅者消费到了哪条消息的offset会记录在Broker,以免下次重复消费。因为消息是顺序消费,先进先出,所以只需要记录上次消息消费到哪里就可以了。

因为AMQ现在已经被不再使用被KahaDB所替代,所以我们就讲KahaDB,JDBC消息存储在许多对可靠性要求高而对性能要求低一些的大公司还是经常使用的,下面我们就这两种持久化方式的使用做一节专题。

2. Kahadb

说到Kahadb之前我们还是得提到他的前身AMQ,AMQ是一种文件存储形式,他具有写入速度快和容易恢复的特点,消息存储在一个个的文件里,文件默认大小为32M,超过这个大小的消息将会存入下一个文件。当一个文件中的消息已经全部消费,那么这个文件将被标志我可删除,在下一个清除阶段这个文件将被删除。

如果需要使用持久化,则需要在前文中的配置文件applicationContext-ActiveMQ.xml中增加如下配置:

KahaDB的属性件下表格:

属性名称属性值描述

directoryactivemq-data消息文件和日志的存储目录

indexWriteBatchSize1000一批索引的大小,当要更新的索引量到达这个值时,更新到消息文件中

indexCacheSize1000内存中,索引的页大小

enableIndexWriteAsyncfalse索引是否异步写到消息文件中

journalMaxFileLength32mb一个消息文件的大小

enableJournalDiskSyncstrue是否讲非事务的消息同步写入到磁盘

cleanupInterval30000清除操作周期,单位ms

checkpointInterval5000索引写入到消息文件的周期,单位ms

ignoreMissingJournalfilesfalse忽略丢失的消息文件,false,当丢失了消息文件,启动异常

checkForCorruptJournalFilesfalse检查消息文件是否损坏,true,检查发现损坏会尝试修复

checksumJournalFilesfalse产生一个checksum,以便能够检测journal文件是否损坏。

5.4版本之后有效的属性:

archiveDataLogsfalse当为true时,归档的消息文件被移到directoryArchive,而不是直接删除

directoryArchivenull存储被归档的消息文件目录

databaseLockedWaitDelay10000在使用负载时,等待获得文件锁的延迟时间,单位ms

maxAsyncJobs10000同个生产者产生等待写入的异步消息最大量

concurrentStoreAndDispatchTopicsfalse当写入消息的时候,是否转发主题消息

concurrentStoreAndDispatchQueuestrue当写入消息的时候,是否转发队列消息

5.6版本之后有效的属性:

archiveCorruptedIndexfalse是否归档错误的索引

由于在ActiveMQ V5.4+的版本中,KahaDB是默认的持久化存储方案。所以即使你不配置任何的KahaDB参数信息,ActiveMQ也会启动KahaDB。这种情况下,KahaDB文件所在位置是你的ActiveMQ安装路径下的/data/broker.Name/KahaDB子目录。其中{broker.Name}代表这个ActiveMQ服务节点的名称。下面我把刚启动服务并发送了消息之后的activemq安装目录打开给大家看看:

这里写图片描述

正式的生产环境还是建议在主配置文件中明确设置KahaDB的工作参数:

...

3. 关系型数据库存储方案

从ActiveMQ 4+版本开始,ActiveMQ就支持使用关系型数据库进行持久化存储——通过JDBC实现的数据库连接。可以使用的关系型数据库囊括了目前市面的主流数据库。

使用JDBC的方式持久化我们就得修改之前的配置文件:

将其中的这段配置:

修改为下面这段内容:

在结点之后,增加数据源的配置,如下:

还是在上一篇的实例工程中,我们改变一下applicationContext-ActiveMQ.xml的配置如下:

first-queue

此时,重新启动MQ,就会发现db数据库中多了三张表:activemq_acks,activemq_lock,activemq_msgs,OK,说明activemq已经持久化成功啦!

这里写图片描述

activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存,主要数据库字段如下:

container:消息的destination sub_dest:如果是使用static集群,这个字段会有集群其他系统的信息 client_id:每个订阅者都必须有一个唯一的客户端id用以区分 sub_name:订阅者名称 selector:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性and和or操作 last_acked_id:记录消费过的消息的id

activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。

activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中。主要的数据库字段如下:

id:自增的数据库主键 container:消息的destination msgid_prod:消息发送者客户端的主键 msg_seq:是发送消息的顺序,msgid_prod+msg_seq可以组成jms的messageid expiration:消息的过期时间,存储的是从1970-01-01到现在的毫秒数 msg:消息本体的java序列化对象的二进制数据 priority:优先级,从0-9,数值越大优先级越高 activemq_acks用于存储订阅关系。如果是持久化topic,订阅者和服务器的订阅关系在这个表保存。

点击复制链接 与好友分享!回本站首页
上一篇:目标检测的图像特征提取之(一)HOG特征
下一篇:脚本语言和变易语言的区别
相关文章
图文推荐
文章
推荐
点击排行

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训
版权所有: 红黑联盟--致力于做实用的IT技术学习网站