02-应对消息丢失、重复、顺序与积压的全面策略

image-20241219171829514

引言

在分布式系统架构中,消息队列是实现组件间通信解耦、增强系统可扩展性与可靠性的重要工具。不过,在实际应用中,我们也会面临一些挑战,比如:

  1. 消息丢失:这可能是由于网络故障、服务宕机或配置错误等原因造成的。确保消息持久化、采用可靠的消息确认机制以及设置合理的超时重试策略可以有效减少消息丢失。
  2. 重复消费:网络波动或其他异常情况下,同一消息可能会被多次投递给消费者。通过实现幂等性接口,即确保相同操作执行多次的结果与执行一次相同,可以避免因重复消费带来的负面影响。
  3. 顺序消费:当业务逻辑要求消息按照特定顺序处理时,若消息队列无法保证消息的有序传递,则可能导致数据不一致等问题。选择支持有序消息的队列产品,或者设计业务逻辑以容忍一定程度上的无序,都是可行的解决方案。
  4. 消息积压:如果消费者处理速度跟不上生产者的发送速率,就会导致消息积压。优化消费者性能、增加消费者实例数量、合理调整队列的预取参数等措施有助于缓解这一情况。

kafka

消息丢失

  • 消息生产者发送消息给Broker的时候数据丢失
  • Broker异常导致Broker中的数据丢失
  • 消息消费者消费异常导致消息丢失

生产者

Kafka集群(其实是分区的Leader)会返回一个ACK来确认Producer推送消息的结果,Kafka提供了三种模式:

  • NoResponse RequiredAcks = 0:这个代表的就是不进行消息推送是否成功的确认。
  • WaitForLocal RequiredAcks = 1:当local(Leader)确认接收成功后,就可以返回了。
  • WaitForAll RequiredAcks = -1:当所有的Leader和Follower都接收成功时,才会返回。

因此这个配置的影响也分为下面三种情况:

  • 设置为0,Producer不进行消息发送的确认,Kafka集群(Broker)可能由于一些原因并没有收到对应消息,从而引起消息丢失。
  • 设置为1,Producer在确认到 Topic Leader 已经接收到消息后,完成发送,此时有可能 Follower 并没有接收到对应消息。此时如果 Leader 突然宕机,在经过选举之后,没有接到消息的 Follower 晋升为 Leader,从而引起消息丢失。
  • 设置为-1,可以很好的确认Kafka集群是否已经完成消息的接收和本地化存储,并且可以在Producer发送失败时进行重试。

消费者

Kafka消费者默认使用自动提交偏移量的功能,当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期由客户端参数auto.commit.interval.ms配置,默认5S,此参数生效的前提是enable.auto.commit参数为true。

自动提交虽然简单,但可能会造成消息丢失,比如消费者刚拉取了一批消息,然后刚好到达了提交位移的时间,刚才的消息位移就提交了,但是消费者此时出现了故障,消息还未来得及处理,这样消费者重启后就会出现消息丢失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Configuration
@EnableKafka
public class ManualConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${kafka.topic.manual}")
private String topic;

/**
* RECORD:每处理一条commit一次
* BATCH(默认):每次poll的时候批量提交一次,频率取决于每次poll的调用频率
* TIME:每次间隔ackTime的时间去commit
* COUNT:累积达到ackCount次的ack去commit
* COUNT_TIME:ackTime或ackCount哪个条件先满足,就commit
* MANUAL:listener负责ack,但是背后也是批量上去
* MANUAL_IMMEDIATE:listner负责ack,每调用一次,就立即commit
**/
@Bean
public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-group");
// 手动提交
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;
}
}
@Component
public class ManualConsumer {

@KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
public void receive(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
System.out.println(String.format("From partition %d : %s", partition, message));
// 同步提交
consumer.commitSync();
}
}

重复消费

解决重复消费的方案就是保证接口的幂等性 ,幂等性是指对于同一个操作,无论执行多少次,结果都是一致的。在消息中间件中,为了保证可靠性,需要提供幂等性接口来避免重复消费消息。常用的保证幂等性的方法是使用全局唯一ID或幂等令牌(Idempotency Key)。全局唯一ID或幂等令牌是一种唯一标识,用于标识一次操作的唯一性,如果重复执行同一个操作,只会产生一次结果。例如,在分布式系统中,可以使用全局唯一ID来保证幂等性。

顺序消费

Kafka中的 每个分区中的消息是天生保证顺序消费的,所以我们只需要保证同一个业务下的消息发送到同一个分区就可以,这里Kafka中已经帮我们实现好了,只需要投递消息都设置相同的key的情况下都会存放到同一个分区中。

1
2
3
4
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
return this.doSend(producerRecord);
}

消息积压

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1)batch.memory修改缓冲区大小

设置发送消息的缓冲区,默认值是33554432,就是32MB

如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住。

2)compression.type压缩格式

默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销。

3)batch.size批次大小

设置merge batch合并批次消息的大小

如果 batch 批次太小,会导致频繁网络请求,吞吐量下降;

如果batch批次太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里。

默认值是:16384,就是16kb,也就是一个batch批次满了16kb就发送出去,一般在实际生产环境,这个batch批次的值可以增大一些来提升吞吐量,可以自己压测一下。

4)linger.ms等待时长

这个值默认是0,意思就是消息必须立即被发送,但是这是不对的。

一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch批次,如果100毫秒内,这个batch批次满了16kb,自然就会发送出去。

但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。

消费者

  • 增加消费者并发处理能力
    • 增加消费者数量:通过增加消费者实例数量,分散处理压力。Kafka消费者组内的每个消费者可以从不同的分区并行消费消息。如果当前分区数较多,但消费者数量较少,增加消费者可以提高处理速度。
    • 增加分区数量:如果消息的生产速率非常高且单个消费者处理能力有限,可以通过增加分区的数量来提升并发性。每个分区可以对应一个消费者,使得更多消费者能够同时处理消息。

注意:分区的数量应该和消费者数量相匹配,每个分区只能被一个消费者消费,多增加的消费者无法分配到分区。

  • 提升消费者的消费能力
    • 批量消费:通过批量获取消息,而不是逐条消费,可以显著提升消费性能。调整消费者的批量拉取大小(max.poll.records)来提高每次拉取的消息量。
    • 异步处理:让消费者异步处理消息,而不是同步处理。例如,处理过程中可以将消息放入一个任务队列,然后由后台线程或其他服务处理。
    • 优化消费者逻辑:分析消费者的业务逻辑,优化耗时操作(如数据库操作、IO操作等)。例如,使用批量插入数据库或优化网络通信等。

其他

数据积压可能是我们在编写代码处理逻辑的时候,代码质量不高,处理速度慢导致消费数据的性能低,可以优化代码。

RabbitMQ

消息丢失

  • 生产者弄丢了数据,生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。
  • RabbitMQ 弄丢了数据,MQ还没有持久化自己挂了
  • 消费端弄丢了数,。刚消费到,还没处理,结果进程挂了,比如重启了。

生产者

为了避免因为网络故障或闪断问题导致消息无法正常发送到 RabbitMQ Server 的情况,RabbitMQ 提供了两种方案让生产者可以感知到消息是否正确无误的发送到 RabbitMQ Server中,这两种方案分别是事务机制发送方确认机制

事务机制,保证生产者发送消息到 RabbitMQ Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class RabbitMQConfig {
/**
* 配置事务管理器
*/
@Bean
public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
@Service
public class RabbitMQServiceImpl {
@Autowired
private RabbitTemplate rabbitTemplate;

@Transactional // 事务注解
public void sendMessage() {
// 开启事务
rabbitTemplate.setChannelTransacted(true);
// 发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
}
}
发送方确认机制,保证消息能从交换机路由到指定队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启发送方确认机制
publisher-returns: true # 开启消息返回
template:
mandatory: true # 消息投递失败返回客户端
@Component
@Slf4j
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate.setConfirmCallback(this);
//设置回退消息交给谁处理
rabbitTemplate.setReturnsCallback(this);
}


@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交换机已经收到 id 为:{}的消息",id);
}else{
log.info("交换机还未收到 id 为:{}消息,原因:{}",id,cause);
}
}

//当消息无法路由的时候的回调方法
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
Message message = returnedMessage.getMessage();
String exchange = returnedMessage.getExchange();
String replyText = returnedMessage.getReplyText();
final String routingKey = returnedMessage.getRoutingKey();
log.error(" 消息 {}, 被交换机 {} 退回,退回原因 :{}, 路由 key:{}",new String(message.getBody()),exchange,replyText,routingKey);
}
}
保证消息在 RabbitMQ Server 中的持久化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 消息队列
*/
@Bean
public Queue queue() {
// 四个参数:name(队列名)、durable(持久化)、 exclusive(独占)、autoDelete(自动删除)
return new Queue(MESSAGE_QUEUE, true);
}

/**
* 直接交换机
*/
@Bean
public DirectExchange exchange() {
// 四个参数:name(交换机名)、durable(持久化)、autoDelete(自动删除)、arguments(额外参数)
return new DirectExchange(Direct_Exchange, true, false);
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启发送方确认机制
publisher-returns: true # 开启消息返回
template:
mandatory: true # 消息投递失败返回客户端
listener:
simple:
acknowledge-mode: manual # 开启手动确认消费机制
# retry:
# enabled: true # 是否开启消费者重试(为false时关闭消费者重试,意思不是不重试,而是一直收到消息直到ack确认或者一直到超时)
# max-attempts: 5 # 最大重试次数,代码中不能使用try/catch捕获异常,否则重试机制失效
# max-interval: 5000 # 重试间隔时间(单位毫秒)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j
@Component
public class ReceiveHandler {
@Autowired
private RabbitTemplate rabbitTemplate;

//监听user队列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_USER})
public void receive_user(String msg, Message message, Channel channel) throws Exception{
String msg_id = message.getMessageProperties().getMessageId();
// TODO 判断消息是否被消费
try {
int i = 1/0;
/**
* channel.basicAck(deliveryTag,multiple)
* deliveryTag:消息的index
* multiple:是否批量-true将一次性确认ack小于deliveryTag的消息
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

// TODO 被消费存到一个地方
}catch (Exception e){
/**
* channel.basicNack(deliveryTag,multiple,requeue)
* deliveryTag:消息的index
* multiple:是否批量-true将一次性拒绝所有ack小于deliveryTag的消息
* requeue:是否重新入队列
*/
if(CommonConstants.mess_id_map.getOrDefault(msg_id,0) <= 3){
Integer count = CommonConstants.mess_id_map.getOrDefault(msg_id,1);
CommonConstants.mess_id_map.put(msg_id,++count);
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
log.info("当前时间:{},次数:{}", new Date(),count);
}else{
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
rabbitTemplate.convertAndSend(RabbitmqConfig.DELAYED_EXCHANGE_USER, "delayed.user", msg,
correlationData ->{
correlationData.getMessageProperties().setDelay(1000 * 10);
return correlationData;
});
}
// 触发重试机制
// throw e;
}
}

@RabbitListener(queues = {RabbitmqConfig.DELAYED_QUEUE_USER})
public void receive_delayed_user(String msg, Message message, Channel channel) throws Exception{
log.info("延迟队列:当前时间:{},消息:{}", new Date(),msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}

重复消费

解决重复消费的方案就是保证接口的幂等性 ,幂等性是指对于同一个操作,无论执行多少次,结果都是一致的。在消息中间件中,为了保证可靠性,需要提供幂等性接口来避免重复消费消息。常用的保证幂等性的方法是使用全局唯一ID或幂等令牌(Idempotency Key)。全局唯一ID或幂等令牌是一种唯一标识,用于标识一次操作的唯一性,如果重复执行同一个操作,只会产生一次结果。例如,在分布式系统中,可以使用全局唯一ID来保证幂等性。

顺序消费

  1. 单活模式队列来保证每个队列存在多个消费者实例,但是只会有一个实例起作用
1
2
3
4
5
6
private Queue creatQueue(String name) {
// 创建一个 单活模式 队列
HashMap<String, Object> args = new HashMap<>();
args.put("x-single-active-consumer", true);
return new Queue(name, true, false, false, args);
}
  1. 利用消息的路由键(Routing Key):将相关的消息通过相同的路由键发送到同一个队列,从而确保消息在队列中保持顺序。
  2. 基于spring-cloud-starter-stream-rabbit实现分区消费。

消息积压

  • 增加消费者数量:通过增加消费者的数量来提高消息的处理速度。可以根据系统的负载情况动态地增加或减少消费者的数量。
  • 提高消费者的处理能力:可以通过优化消费者的代码逻辑、提升消费者的性能等方式来提高消费者的处理能力,从而加快消息的消费速度。
  • 增加消息队列的吞吐量:可以通过增加消息队列的并发处理能力来提高消息的处理速度。可以考虑增加队列的分区、增加消息的分发策略等方式来提高消息队列的吞吐量。
  • 设置消息的过期时间:可以设置消息的过期时间,当消息在队列中超过一定时间还未被消费时,可以将其丢弃或进行其他处理,避免消息积压。
  • 使用延迟队列:可以使用延迟队列来实现消息的延时处理,将消息发送到延迟队列中,然后在指定的时间后再进行消费,从而避免消息的积压。

RokectMQ

消息丢失

  • 消息生产者将消息发送到RocketMQ Broker的这个过程可能出现消息丢失。
  • RocketMQ Broker接收到生产者发送的消息存储的过程消息可能丢失。
  • 消费者处理失败,但是将错误进行捕捉,导致消息出现虚假的消费成功。实际上没有消费,但是在MQ看来消费完成了消费。

生产者

提供SYNC的发送消息方式,等待broker处理结果

RocketMQ提供了3种发送消息方式,分别是:

  • 同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应发送结果。
  • 异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。
  • Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer只负责把请求发出去,而不处理响应结果。

我们在调用producer.send方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式。

发送消息如果失败或者超时,则重新发送。
broker提供多master模式,即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上。
1
2
3
4
5
6
// 同步设置重试次数
producer.setRetryTimesWhenSendFailed(3)
// 异步设置重试次数
producer.setRetryTimesWhenSendAsyncFailed(3);
// 如果发送失败,是否尝试发送到其他 Broker 节点
producer.setRetryAnotherBrokerWhenNotStoreOK(true);

消费者

开启手动ACK机制,当消费者成功将消息消费后给RocketMQ发送一个ACK消息,只有当RocketMQ接收到后才会确认消息是被成功消费了

重复消费

解决重复消费的方案就是保证接口的幂等性 ,幂等性是指对于同一个操作,无论执行多少次,结果都是一致的。在消息中间件中,为了保证可靠性,需要提供幂等性接口来避免重复消费消息。常用的保证幂等性的方法是使用全局唯一ID或幂等令牌(Idempotency Key)。全局唯一ID或幂等令牌是一种唯一标识,用于标识一次操作的唯一性,如果重复执行同一个操作,只会产生一次结果。例如,在分布式系统中,可以使用全局唯一ID来保证幂等性。

顺序消费

RocketMQ 通过将消息发送到不同的分区(Partition)来保证顺序性。在 RocketMQ 中,消息的顺序性是通过相同的消息 key(通常是业务唯一标识,例如订单 ID)发送到相同的队列(Queue)来实现的。

消息积压

可参考RabbitMQ

总结

对于大多数消息队列在解决消息丢失、重复消费、顺序消费和消息积压这四个经典问题上的基本思路是相似的:

消息丢失

通用解决思路:

  • 持久化存储:确保消息被可靠地写入磁盘。
  • 高可用架构:通过主从复制、集群部署等手段提高系统的容错能力。
  • 确认机制:生产者与消费者端都应有消息发送与消费后的确认机制。
  • 重试策略:设置合理的重试机制来处理临时性故障。
  • 监控与报警:实时监控系统状态,及时发现并响应潜在的问题。

补充完善:

  • 数据备份与恢复:定期进行数据备份,并制定灾难恢复计划。
  • 日志记录与审计:详细记录消息操作日志,便于事后追踪与审计。
  • 跨数据中心同步:对于关键业务,考虑跨多个数据中心的数据同步以增加冗余度。

重复消费

通用解决思路:

  • 幂等性设计:保证同一消息多次处理结果一致。
  • 唯一标识符:为每条消息分配唯一ID用于查重。
  • 分布式锁或屏障:防止同一时间点多个实例处理同一条消息。

补充完善:

  • 消费历史跟踪:利用缓存(如Redis)保存已成功消费的消息ID,避免重复处理。
  • 延迟队列:对于可能存在的短暂网络抖动导致的重复推送,可以使用延迟队列让消息稍后再尝试消费。

顺序消费

通用解决思路:

  • 分区/分组:按一定规则对消息进行分区,保证同一分区内消息有序。
  • 单线程处理:确保同一类消息由单一消费者线程处理以维持顺序。
  • 事务支持:对于需要严格顺序的应用场景,结合事务消息功能。

补充完善:

  • 优先级队列:对于不同优先级的消息流,可采用多级队列管理,确保高优先级消息先被处理。
  • 轻量级排序算法:在某些情况下,可以在内存中实现简单的排序逻辑,以最小化性能影响的同时保持部分顺序。

消息积压

通用解决思路:

  • 水平扩展:增加消费者数量以提升处理速度。
  • 预取计数调整:合理配置预取参数,平衡消费者负载。
  • 异步处理:尽可能采用异步方式加速消息处理流程。

补充完善:

  • 流量控制:引入限流措施,防止生产者过快发送消息导致消费者无法跟上。
  • 弹性伸缩:根据实际负载动态调整消费者的规模。
  • 死信队列:对于长期未能处理的消息,将其移至死信队列以便后续分析和处理。
  • 预警机制:当检测到消息积压时,提前发出警告,使运维人员能够及时介入。

02-应对消息丢失、重复、顺序与积压的全面策略
https://janycode.github.io/2024/12/05/15_分布式/06_分布式消息队列/02-应对消息丢失、重复、顺序与积压的全面策略/
作者
Jerry(姜源)
发布于
2024年12月5日
许可协议