头部
logo
GT_WUBA

RocketMQ笔记1-简介-单点模式-生产者消费者的使用-工作流程

2019年10月31日 /By wuba/浏览量:21

简介


单机版安装

  • 通过docker安装RocketMQ Server + Broker + Console,至少需要 2G 内存

  • docker-compose.yml 如下:

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
    networks:
      rmq:
        aliases:
          - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
      - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
    environment:
      NAMESRV_ADDR: "rmqnamesrv:9876"
      JAVA_OPTS: " -Duser.home=/opt"
      JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8080:8080
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge
  • broker.conf

RocketMQ Broker 需要一个配置文件,按照上面的 Compose 配置,我们需要在 ./data/brokerconf/ 目录下创建一个名为 broker.conf 的配置文件,内容如下:

 # 所属集群名字
brokerClusterName=DefaultCluster

  # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
  # 在 broker-b.properties 使用: broker-b
brokerName=broker-a

  # 0 表示 Master,> 0 表示 Slave
brokerId=0

  # nameServer地址,分号分割
  # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

  # 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
  # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
  # brokerIP1=192.168.0.253

  # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

  # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true

  # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

  # Broker 对外服务的监听端口
listenPort=10911

  # 删除文件时间点,默认凌晨4点
deleteWhen=04

  # 文件保留时间,默认48小时
fileReservedTime=120

  # commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

  # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

  # destroyMapedFileIntervalForcibly=120000
  # redeleteHangedFileInterval=120000
  # 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
  # 存储路径
  # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
  # commitLog 存储路径
  # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
  # 消费队列存储
  # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
  # 消息索引存储路径
  # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
  # checkpoint 文件存储路径
  # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
  # abort 文件存储路径
  # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
  # 限制的消息大小
maxMessageSize=65536

  # flushCommitLogLeastPages=4
  # flushConsumeQueueLeastPages=2
  # flushCommitLogThoroughInterval=10000
  # flushConsumeQueueThoroughInterval=60000

  # Broker 的角色
  # - ASYNC_MASTER 异步复制Master
  # - SYNC_MASTER 同步双写Master
  # - SLAVE
brokerRole=ASYNC_MASTER

  # 刷盘方式
  # - ASYNC_FLUSH 异步刷盘
  # - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
  • 启动

 docker-compose up -d
  • 访问控制台

 http://ip:8080

生产者的使用

  • 创建生产者对象DefaultMQProducer

  • 设置NamesrvAddr

  • 启动生产者服务

  • 创建消息并发送

 package com.wu.mq.quickstart;

  import com.wu.mq.constants.Const;
  import org.apache.rocketmq.client.exception.MQBrokerException;
  import org.apache.rocketmq.client.exception.MQClientException;
  import org.apache.rocketmq.client.producer.DefaultMQProducer;
  import org.apache.rocketmq.client.producer.SendResult;
  import org.apache.rocketmq.common.message.Message;
  import org.apache.rocketmq.remoting.exception.RemotingException;

  /**
  * @author :wuba
  * @date :Created in 2019/10/27 21:38
  * @description:消息生产者
  */

  public class Producer {
  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
  //创建生产者对象
  DefaultMQProducer producer = new DefaultMQProducer("test_producerGroup");
  //设置Name Server addr
  producer.setNamesrvAddr(Const.NAMESRV_ADDR);
  //启动服务
  producer.start();
  //创建消息并发送
  for (int i = 0; i < 5; i++) {
  Message message = new Message("test_topic", "TagA", "Key" + i, ("Hello,RocketMQ" + i).getBytes());
  SendResult sr = producer.send(message);
  System.out.println(sr);
  }
  //关闭
  producer.shutdown();
  }
}

消费者的使用

  • 创建消费者对象DefaultMQPushConsumer

  • 设置NamesrvAddr 及消费位置ConsumeFromWhere

  • 进行订阅主题subscribe

  • 注册监听并消费 registerMessageListener

package com.wu.mq.quickstart;

import com.wu.mq.constants.Const;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;

/**
 * @author :wuba
 * @date :Created in 2019/10/27 21:39
 * @description:消息消费者
 */

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumerGroup");
        //设置NamesrvAddr 及消费位置ConsumeFromWhere
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //订阅topic,  * 指该主题下的所有消息都能消费
        consumer.subscribe("test_topic","*");
        //注册监听并消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt msg = list.get(0);
                try {
                    String topic = msg.getTopic();
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    if("Key3".equals(keys)){
                        System.out.println("消息消费失败...");
                        int i=1/0;
                    }
                    String msgBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
                }catch (Exception e){
                    e.printStackTrace();
                    int reconsumeTimes = msg.getReconsumeTimes();
                    System.out.println("reconsumeTimes :"+reconsumeTimes);
                    if(reconsumeTimes == 2){
                        //记录日志,补偿处理
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费服务
        System.out.println("消费服务启动...");
        consumer.start();
    }
}

RocketMQ拓扑图

1572518806413010955.png


  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。


工作流程

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。


参考

 RocketMQ开发者指南