RocketMQ
简介
RocketMQ单机支持1万以上的持久化队列,前提是足够的内存、硬盘空间,过期数据数据删除(RocketMQ中的消息队列长度不是无限的,只是足够大的内存+数据定时删除)
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:
支持严格的消息顺序
支持Topic与Queue两种模式
亿级消息堆积能力
比较友好的分布式特性
同时支持Push与Pull方式消费消息
高效的订阅者水平扩展能力
实时的消息订阅机制
基本概念
看完其实还是不懂,所以我们先了解几个基本概念:
名称 | 含义 |
---|---|
Producer | 消息生产者,负责产生消息,一般由业务系统负责产生消息 |
Consumer | 消息消费者,负责消费消息,一般是后台系统负责异步消费 |
Push Consumer | Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法 |
Pull Consumer | Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制 |
Producer Group | 一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致 |
Consumer Group | 一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致 |
Broker | 消息中转角色,负责存储消息,转发消息,一般也称为Server。 |
长连接 | 在页面嵌入一个隐藏的Iframe,将这个隐藏Iframe的src属性设置为对一个长连接的请求或是采用xhr请求,服务器端就源源不断的向客户端输入数据。 |
安装RocketMQ
部署并启动NameServer:
- 安装JDK并设置JAVA_HOME
详情请看JDK安装 - 下载rocketmq 下载地址:https://github.com/alibaba/RocketMQ
- 配置rocketmq的环境变量,在/etc/profile最后添加
然后执行export ROCKETMQ_HOME=/opt/alibaba-rocketmq export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
source /etc/profile
使配置生效 - 给命令添加指向权限
cd /opt/alibaba-rocketmq/bin/; chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv
- 新建日志目录
cd /opt/alibaba-rocketmq mkdir log
- 启动NameServer
nohup mqnamesrv 1>/opt/alibaba-rocketmq/log/ng.log 2>/opt/alibaba-rocketmq/log/ng-err.log &
- 查看状态
$ps aux|grep java 125233 12248 21.1 0.9 7151512 75844 pts/1 Sl 11:37 0:01 /opt/java/jdk1.8.0_45/bin/java -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -verbose:gc -Xloggc:/home/xiaolong.xiao/rmq_srv_gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -Djava.ext.dirs=/opt/alibaba-rocketmq/bin/../lib -cp .:/opt/alibaba-rocketmq/bin/../conf:.:/opt/java/jdk1.8.0_45/lib/dt.jar:/opt/java/jdk1.8.0_45/lib/tools.jar com.alibaba.rocketmq.namesrv.NamesrvStartup
- 验证NameServer是否已经启动
$tail -f /opt/alibaba-rocketmq/log/ng.log The Name Server boot success.
部署Brokeer:消息中转角色,负责存储消息,转发消息
Broker集群有多种配置方式:
单Master
优点:除了配置简单没什么优点
缺点:不可靠,该机器重启或宕机,将导致整个服务不可用
多Master
优点:配置简单,性能最高
缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性
多Master多Slave,每个Master配一个Slave,有多对Master-Slave,集群采用异步复制方式,主备有短暂消息延迟,毫秒级
优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预
缺点:Master宕机或磁盘损坏时会有少量消息丢失
多Master多Slave,每个Master配一个Slave,有多对Master-Slave,集群采用同步双写方式,主备都写成功,向应用返回成功
优点:服务可用性与数据可用性非常高
缺点:性能比异步集群略低,当前版本主宕备不能自动切换为主
Master和Slave的配置文件参考conf目录下的配置文件
Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数
一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分
部署一Master一Slave,集群采用异步复制方式:
Master:
[hadoop@hadoop bin]$ nohup sh mqbroker -n "192.168.58.163:9876" -c /opt/alibaba-rocketmq/conf/2m-2s-async/broker-a.properties 1>/opt/alibaba-rocketmq/log/broker.log 2>/opt/alibaba-rocketmq/log/broker-err.log&
[2] 25493
[hadoop@hadoop bin]$ jps
25500 BrokerStartup
25545 Jps
17682 NamesrvStartup
Slave:
[hadoop@hadoop bin]$ nohup sh mqbroker -n "192.168.58.163:9876" -c /opt/alibaba-rocketmq/conf/2m-2s-async/broker-a-s.properties 1>/opt/alibaba-rocketmq/log/broker.log 2>/opt/alibaba-rocketmq/log/broker-err.log&
[1] 1974
[hadoop@hadoop bin]$ jps
2071 Jps
1981 BrokerStartup
Broker监听端口:10911
如果服务器内存不够,可以修改runbroker.sh脚本(mqbroker文件中通过runbroker.sh脚本调用Broker的主函数com.alibaba.rocketmq.broker.BrokerStartup启动Broker)的JAVA_OPT_1参数
具体的配置文件可以参照Conf下面的配置。
Producer
开发之前首先加上依赖包
<!-- RocketMQ Java SDK -->
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
Consumer
其他资料
RocketMQ性能压测分析 RocketMQ与Kafka对比(18项差异) 分布式消息队列RocketMQ部署与监控 RocketMQ入门