Spring Boot优雅使用RocketMQ的方法实例
|
前言 MQ,是一种跨进程的通信机制,用于上下游传递消息。在传统的互联网架构中通常使用MQ来对上下游来做解耦合。 举例:当A系统对B系统进行消息通讯,如A系统发布一条系统公告,B系统可以订阅该频道进行系统公告同步,整个过程中A系统并不关系B系统会不会同步,由订阅该频道的系统自行处理。 什么是RocketMQ?# 官方说明: 随着使用越来越多的队列和虚拟主题,ActiveMQ IO模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。因此,我们那时开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。 看到这里可以很清楚的知道RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。 具有以下特性:
RocketMQ环境安装# 下载地址:https://rocketmq.apache.org/dowloading/releases/ 从官方下载二进制或者源码来进行使用。源码编译需要Maven3.2x,JDK8 在根目录进行打包: mvn -Prelease-all -DskipTests clean packager -U distribution/target/apache-rocketmq文件夹中会存在一个文件夹版,zip,tar三个可运行的完整程序。 使用rocketmq-4.6.0.zip:
SpringBoot环境中使用RocketMQ# SpringBoot 入门:https://www.jb51.net/article/177449.htm SpringBoot 常用start:https://www.jb51.net/article/177451.htm 当前环境版本为:
在项目工程中导入:
<!-- MQ Begin -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- MQ End -->
由于我们这边已经有工程了所以就不在进行创建这种过程了。主要是看看如何使用RocketMQ。 创建RocketMQProperties配置属性类,类中内容如下:
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
private boolean isEnable = false;
private String namesrvAddr = "localhost:9876";
private String groupName = "default";
private int producerMaxMessageSize = 1024;
private int producerSendMsgTimeout = 2000;
private int producerRetryTimesWhenSendFailed = 2;
private int consumerConsumeThreadMin = 5;
private int consumerConsumeThreadMax = 30;
private int consumerConsumeMessageBatchMaxSize = 1;
//省略get set
}
现在我们所有子系统中的生产者,消费者对应: isEnable 是否开启mq namesrvAddr 集群地址 groupName 分组名称 设置为统一已方便系统对接,如有其它需求在进行扩展,类中我们已经给了默认值也可以在配置文件或配置中心中获取配置,配置如下: #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 rocketmq.groupName=please_rename_unique_group_name #是否开启自动配置 rocketmq.isEnable=true #mq的nameserver地址 rocketmq.namesrvAddr=127.0.0.1:9876 #消息最大长度 默认1024*4(4M) rocketmq.producer.maxMessageSize=4096 #发送消息超时时间,默认3000 rocketmq.producer.sendMsgTimeout=3000 #发送消息失败重试次数,默认2 rocketmq.producer.retryTimesWhenSendFailed=2 #消费者线程数量 rocketmq.consumer.consumeThreadMin=5 rocketmq.consumer.consumeThreadMax=32 #设置一次消费消息的条数,默认为1条 rocketmq.consumer.consumeMessageBatchMaxSize=1 创建消费者接口 RocketConsumer.java 该接口用户约束消费者需要的核心步骤:
/**
* 消费者接口
*
* @author SimpleWu
*
*/
public interface RocketConsumer {
/**
* 初始化消费者
*/
public abstract void init();
/**
* 注册监听
*
* @param messageListener
*/
public void registerMessageListener(MessageListener messageListener);
}
创建抽象消费者 AbstractRocketConsumer.java:
/**
* 消费者基本信息
*
* @author SimpelWu
*/
public abstract class AbstractRocketConsumer implements RocketConsumer {
protected String topics;
protected String tags;
protected MessageListener messageListener;
protected String consumerTitel;
protected MQPushConsumer mqPushConsumer;
/**
* 必要的信息
*
* @param topics
* @param tags
* @param consumerTitel
*/
public void necessary(String topics,String tags,String consumerTitel) {
this.topics = topics;
this.tags = tags;
this.consumerTitel = consumerTitel;
}
public abstract void init();
@Override
public void registerMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
}
在类中我们必须指定这个topics,tags与消息监听逻辑
接下来我们编写自动配置类RocketMQConfiguation.java,该类用户初始化一个默认的生产者连接,以及加载所有的消费者。 @EnableConfigurationProperties({ RocketMQProperties.class }) 使用该配置文件 @Configuration 标注为配置类 @ConditionalOnProperty(prefix = "rocketmq",value = "isEnable",havingValue = "true") 只有当配置中指定rocketmq.isEnable = true的时候才会生效 核心内容如下:
/**
* mq配置
*
* @author SimpleWu
*/
@Configuration
@EnableConfigurationProperties({ RocketMQProperties.class })
@ConditionalOnProperty(prefix = "rocketmq",havingValue = "true")
public class RocketMQConfiguation {
private RocketMQProperties properties;
private ApplicationContext applicationContext;
private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class);
public RocketMQConfiguation(RocketMQProperties properties,ApplicationContext applicationContext) {
this.properties = properties;
this.applicationContext = applicationContext;
}
/**
* 注入一个默认的消费者
* @return
* @throws MQClientException
*/
@Bean
public DefaultMQProducer getRocketMQProducer() throws MQClientException {
if (StringUtils.isEmpty(properties.getGroupName())) {
throw new MQClientException(-1,"groupName is blank");
}
if (StringUtils.isEmpty(properties.getNamesrvAddr())) {
throw new MQClientException(-1,"nameServerAddr is blank");
}
DefaultMQProducer producer;
producer = new DefaultMQProducer(properties.getGroupName());
producer.setNamesrvAddr(properties.getNamesrvAddr());
// producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
// 如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
// producer.setInstanceName(instanceName);
producer.setMaxMessageSize(properties.getProducerMaxMessageSize());
producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout());
// 如果发送消息失败,设置重试次数,默认为2次
producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed());
try {
producer.start();
log.info("producer is start ! groupName:{},namesrvAddr:{}",properties.getGroupName(),properties.getNamesrvAddr());
} catch (MQClientException e) {
log.error(String.format("producer is error {}",e.getMessage(),e));
throw e;
}
return producer;
}
/**
* SpringBoot启动时加载所有消费者
*/
@PostConstruct
public void initConsumer() {
Map<String,AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class);
if (consumers == null || consumers.size() == 0) {
log.info("init rocket consumer 0");
}
Iterator<String> beans = consumers.keySet().iterator();
while (beans.hasNext()) {
String beanName = (String) beans.next();
AbstractRocketConsumer consumer = consumers.get(beanName);
consumer.init();
createConsumer(consumer);
log.info("init success consumer title {},toips {},tags {}",consumer.consumerTitel,consumer.tags,consumer.topics);
}
}
/**
* 通过消费者信心创建消费者
*
* @param consumerPojo
*/
public void createConsumer(AbstractRocketConsumer arc) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName());
consumer.setNamesrvAddr(this.properties.getNamesrvAddr());
consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin());
consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax());
consumer.registerMessageListener(arc.messageListenerConcurrently);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
*/
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);
/**
* 设置一次消费消息的条数,默认为1条
*/
consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize());
try {
consumer.subscribe(arc.topics,arc.tags);
consumer.start();
arc.mqPushConsumer=consumer;
} catch (MQClientException e) {
log.error("info consumer title {}",arc.consumerTitel,e);
}
}
}
(编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
