RocketMQ

简介

RocketMQ单机支持1万以上的持久化队列,前提是足够的内存、硬盘空间,过期数据数据删除(RocketMQ中的消息队列长度不是无限的,只是足够大的内存+数据定时删除)
RocketMQ RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

支持严格的消息顺序
支持Topic与Queue两种模式
亿级消息堆积能力
比较友好的分布式特性
同时支持Push与Pull方式消费消息
高效的订阅者水平扩展能力
实时的消息订阅机制

基本概念

看完其实还是不懂,所以我们先了解几个基本概念:

名称 含义
Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息
Consumer 消息消费者,负责消费消息,一般是后台系统负责异步消费
Push Consumer Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法
Pull Consumer Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制
Producer Group 一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致
Consumer Group 一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致
Broker 消息中转角色,负责存储消息,转发消息,一般也称为Server。
长连接 在页面嵌入一个隐藏的Iframe,将这个隐藏Iframe的src属性设置为对一个长连接的请求或是采用xhr请求,服务器端就源源不断的向客户端输入数据。

安装RocketMQ

部署并启动NameServer:

  1. 安装JDK并设置JAVA_HOME
    详情请看JDK安装
  2. 下载rocketmq 下载地址:https://github.com/alibaba/RocketMQ
  3. 配置rocketmq的环境变量,在/etc/profile最后添加
    export ROCKETMQ_HOME=/opt/alibaba-rocketmq
    export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
    
    然后执行source /etc/profile使配置生效
  4. 给命令添加指向权限
    cd /opt/alibaba-rocketmq/bin/;
    chmod +x mqadmin mqbroker mqfiltersrv mqshutdown  mqnamesrv
    
  5. 新建日志目录
    cd /opt/alibaba-rocketmq
    mkdir log
    
  6. 启动NameServer
    nohup mqnamesrv 1>/opt/alibaba-rocketmq/log/ng.log 2>/opt/alibaba-rocketmq/log/ng-err.log &
    
  7. 查看状态
    $ps aux|grep java
    125233   12248 21.1  0.9 7151512 75844 pts/1   Sl   11:37   0:01 /opt/java/jdk1.8.0_45/bin/java -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -verbose:gc -Xloggc:/home/xiaolong.xiao/rmq_srv_gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -Djava.ext.dirs=/opt/alibaba-rocketmq/bin/../lib -cp .:/opt/alibaba-rocketmq/bin/../conf:.:/opt/java/jdk1.8.0_45/lib/dt.jar:/opt/java/jdk1.8.0_45/lib/tools.jar com.alibaba.rocketmq.namesrv.NamesrvStartup
    
  8. 验证NameServer是否已经启动
    $tail -f /opt/alibaba-rocketmq/log/ng.log
    The Name Server boot success.
    

部署Brokeer:消息中转角色,负责存储消息,转发消息

Broker集群有多种配置方式:

  1. 单Master

    优点:除了配置简单没什么优点

    缺点:不可靠,该机器重启或宕机,将导致整个服务不可用

  2. 多Master

    优点:配置简单,性能最高

    缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性

  3. 多Master多Slave,每个Master配一个Slave,有多对Master-Slave,集群采用异步复制方式,主备有短暂消息延迟,毫秒级

    优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预

    缺点:Master宕机或磁盘损坏时会有少量消息丢失

  4. 多Master多Slave,每个Master配一个Slave,有多对Master-Slave,集群采用同步双写方式,主备都写成功,向应用返回成功

    优点:服务可用性与数据可用性非常高

    缺点:性能比异步集群略低,当前版本主宕备不能自动切换为主

Master和Slave的配置文件参考conf目录下的配置文件

Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数

一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分

部署一Master一Slave,集群采用异步复制方式:

Master:

[hadoop@hadoop bin]$ nohup sh mqbroker -n "192.168.58.163:9876" -c /opt/alibaba-rocketmq/conf/2m-2s-async/broker-a.properties 1>/opt/alibaba-rocketmq/log/broker.log 2>/opt/alibaba-rocketmq/log/broker-err.log&
[2] 25493
[hadoop@hadoop bin]$ jps
25500 BrokerStartup
25545 Jps
17682 NamesrvStartup

Slave:

[hadoop@hadoop bin]$ nohup sh mqbroker -n "192.168.58.163:9876" -c /opt/alibaba-rocketmq/conf/2m-2s-async/broker-a-s.properties 1>/opt/alibaba-rocketmq/log/broker.log 2>/opt/alibaba-rocketmq/log/broker-err.log&
[1] 1974
[hadoop@hadoop bin]$ jps
2071 Jps
1981 BrokerStartup

Broker监听端口:10911
如果服务器内存不够,可以修改runbroker.sh脚本(mqbroker文件中通过runbroker.sh脚本调用Broker的主函数com.alibaba.rocketmq.broker.BrokerStartup启动Broker)的JAVA_OPT_1参数

具体的配置文件可以参照Conf下面的配置。

Producer

开发之前首先加上依赖包

<!-- RocketMQ Java SDK -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.2.6</version>
</dependency>

Consumer

其他资料

RocketMQ性能压测分析 RocketMQ与Kafka对比(18项差异) 分布式消息队列RocketMQ部署与监控 RocketMQ入门

results matching ""

    No results matching ""