参考资料1:https://toscode.gitee.com/dongzhumao86/mqcloud
参考资料(官方文档):https://github.com/sohutv/mqcloud/wiki
MQCloud 是搜狐的一款开源RocketMQ企业级一站式服务平台。
1. MQCloud创建topic 访问链接(也是 domain 字段配置):
dev: dev-mqcloud.xxx.com :80
*默认所有创建的topic都需要经过管理员的审核。
topic格式:组名-业务名-topic
消费者组:组名-业务名-topic-consumer
生产者组:组名-业务名-topic-producer
以下均以此两个业务场景为举例:
学生信息同步topic : chinese-middle-exam-studentinfo-topic
天总消息量:1万 (预估量,并非limit)
并发消息量:100(预估量,并非limit)
消费者组: 对应topic**-consumer**
流控:20 (每个服务实例每秒最多消费的消息数,有效的限流控制)
生产者组: 对应topic**-producer**
答题topic : chinese-middle-exam-answer-topic
天总消息量:10万(预估量,并非limit)
并发消息量:1000(预估量,并非limit)
消费者组: 对应topic**-consumer**
流控:1000 (每个服务实例每秒最多消费的消息数,有效的限流控制,eg: 线上2节点,并发理论值2000QPS)
生产者组: 对应topic**-producer**
2. 生产者服务接入 分支:dev-mqcloud-jiangyuan
pom.xml
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > com.sohu.tv</groupId > <artifactId > mq-client-open</artifactId > <version > 4.6.3</version > </dependency > <repository > <id > sohu.nexus</id > <url > https://raw.github.com/sohutv/mvn_repo/master</url > </repository >
Nacos MQCloud配置(支持重试次数配置):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 xxx: mq: domain: mqcloud.xxx.com:80 retryTimes: 2 middleExamAnswer: topic: chinese-middle-exam-answer-topic producerGroup: chinese-middle-exam-answer-topic-producer consumerGroup: chinese-middle-exam-answer-topic-consumer middleExamStudentInfo: topic: chinese-middle-exam-studentinfo-topic producerGroup: chinese-middle-exam-studentinfo-topic-producer consumerGroup: chinese-middle-exam-studentinfo-topic-consumer logging: level: RocketmqClient: error RocketmqCommon: error RocketmqRemoting: error
配置文件:
1 2 3 4 - data-id: xxx-mqcloud-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} refresh: true group: zzz
配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 @Configuration @Slf4j @SuppressWarnings("all") public class MQConfiguration { @Value("${xxx.mq.domain}") private String domain; @Value("${xxx.mq.retryTimes}") private Integer retryTimes; @Value("${xxx.mq.middleExamAnswer.topic}") private String middleExamAnswerTopic; @Value("${xxx.mq.middleExamAnswer.producerGroup}") private String middleExamAnswerProducerGroup; @Value("${xxx.mq.middleExamStudentInfo.topic}") private String middleExamStudentInfoTopic; @Value("${xxx.mq.middleExamStudentInfo.producerGroup}") private String middleExamStudentInfoProducerGroup; @Bean(initMethod = "start", destroyMethod = "shutdown") public RocketMQProducer answerProducer () { RocketMQProducer producer = new RocketMQProducer (middleExamAnswerProducerGroup, middleExamAnswerTopic); producer.setMqCloudDomain(domain); producer.setDefaultRetryTimes(retryTimes); producer.setResendResultConsumer(result -> { if (!result.isSuccess) { log.info("MQCloud失败重试次数:{},消息:{}" , result.getRetriedTimes(), result.getMqMessage()); } }); return producer; } @Bean(initMethod = "start", destroyMethod = "shutdown") public RocketMQProducer studentInfoProducer () { RocketMQProducer producer = new RocketMQProducer (middleExamStudentInfoProducerGroup, middleExamStudentInfoTopic); producer.setMqCloudDomain(domain); producer.setDefaultRetryTimes(retryTimes); producer.setResendResultConsumer(result -> { if (!result.isSuccess) { log.info("MQCloud失败重试次数:{},消息:{}" , result.getRetriedTimes(), result.getMqMessage()); } }); return producer; } }
发送消息示例其中1个:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Component @Slf4j @SuppressWarnings("all") public class MiddleExamAnswerProducer { @Autowired private RocketMQProducer answerProducer; public Result<SendResult> sendMessage (MiddleExamAnswerDto middleExamAnswerDto) { log.info("MQCloud:answer_message_send={}" , JSONObject.toJSONString(middleExamAnswerDto)); String jsonStr = JSON.toJSONString(middleExamAnswerDto); Result<SendResult> sendResult = answerProducer.publish(jsonStr, String.valueOf(middleExamAnswerDto.getUserId())); if (!sendResult.isSuccess) { log.error("MQCloud:answer_message_send failed, result={}" , JSONObject.toJSONString(sendResult)); } return sendResult; } }
3. 消费者服务接入 分支:dev-mqcloud-jiangyuan
pom.xml
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > com.sohu.tv</groupId > <artifactId > mq-client-open</artifactId > <version > 4.6.3</version > </dependency > <repository > <id > sohu.nexus</id > <url > https://raw.github.com/sohutv/mvn_repo/master</url > </repository >
Nacos 配置共享(同上,略)。
配置文件共享(同上,略)。
配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Configuration @Slf4j @SuppressWarnings("all") public class MQConfiguration { @Value("${xxx.mq.domain}") private String domain; @Value("${xxx.mq.middleExamAnswer.topic}") private String middleExamAnswerTopic; @Value("${xxx.mq.middleExamAnswer.consumerGroup}") private String middleExamAnswerConsumerGroup; @Value("${xxx.mq.middleExamStudentInfo.topic}") private String middleExamStudentInfoTopic; @Value("${xxx.mq.middleExamStudentInfo.consumerGroup}") private String middleExamStudentInfoConsumerGroup; @Bean(initMethod = "start", destroyMethod = "shutdown") public RocketMQConsumer answerConsumer (AnswerJsonConsumer callback) { RocketMQConsumer consumer = new RocketMQConsumer (middleExamAnswerConsumerGroup, middleExamAnswerTopic); consumer.setMqCloudDomain(domain); consumer.setConsumerCallback(callback); return consumer; } @Bean(initMethod = "start", destroyMethod = "shutdown") public RocketMQConsumer studentInfoConsumer (UserInfoSyncConsumer callback) { RocketMQConsumer consumer = new RocketMQConsumer (middleExamStudentInfoConsumerGroup, middleExamStudentInfoTopic); consumer.setMqCloudDomain(domain); consumer.setConsumerCallback(callback); return consumer; } }
消费消息示例1个:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Component @Slf4j @SuppressWarnings("all") public class AnswerJsonConsumer implements ConsumerCallback <String, MessageExt> { @Autowired private AnswerJsonService answerJsonService; @Override @SneakyThrows public void call (String jsonStr, MessageExt messageExt) { if (StringUtils.isEmpty(jsonStr)) { log.info("MQCloud:consumer jsonStr is null!" ); return ; } MiddleExamAnswerDto middleExamAnswerDto = BeanUtils.jsonToBean(jsonStr, MiddleExamAnswerDto.class); log.info("MQCloud:consumer analyAnswer===messageParamter》{}" , jsonStr); Integer throughTypeId = middleExamAnswerDto.getThroughTypeId(); if ((ThroughTypeIdEnum.GUAN_QIA_1.getCode().equals(throughTypeId) || ThroughTypeIdEnum.GUAN_QIA_2.getCode().equals(throughTypeId) || ThroughTypeIdEnum.GUAN_QIA_3.getCode().equals(throughTypeId))) { answerJsonService.analyFirstThreeLevels(middleExamAnswerDto); } else if (ThroughTypeIdEnum.GUAN_QIA_4.getCode().equals(throughTypeId)) { answerJsonService.analyFourLevel(middleExamAnswerDto); } } }
4. 发送&消费查看 MQ发送成功:
MQ消费成功: