RabbitMq相关
# 死信队列
# 概念
死信(Dead Letter) 是 rabbitmq 提供的一种机制。当一条消息满足下列条件之一那么它会成为死信:
- 消息被否定确认(如channel.basicNack) 并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间
- 消息队列的消息数量已经超过最大队列长度
若配置了死信队列,死信会被 rabbitmq 投到死信队列中。
在 rabbitmq 中创建死信队列的操作流程大概是:
- 创建一个交换机作为死信交换机
- 在业务队列中配置 x-dead-letter-exchange 和 x-dead-letter-routing-key,将第一步的交换机设为业务队列的死信交换机
- 在死信交换机上创建队列,并监听此队列
死信队列的设计目的是为了存储没有被正常消费的消息,便于排查和重新投递。死信队列同样也没有对投递时间做出保证,在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信。
为了解决这个问题,rabbit 官方推出了延迟投递插件 rabbitmq-delayed-message-exchange ,推荐使用官方插件来做延时消息。
# 配置死信队列
# demo
demo-xxx-notify-input:
consumer:
max-concurrency: 10 #最大并发
prefetch: 2 #预取数量
auto-bind-dlq: true #开启DLQ(死信队列)
republish-to-dlq: true #设置为true的时候,那么该消息在进入到死信队列的时候,会在headers中加入错误信息
1
2
3
4
5
6
7
2
3
4
5
6
7
# 延时队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列
# 场景
使用场景
订单在十分钟之内未支付则自动取消。
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
用户注册成功后,如果三天内没有登陆则进行短信提醒。
用户发起退款,如果三天内没有得到处理则通知相关运营人员。
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
# 生产者
延时队列需在生产者设置header
x-delay
//延时5分钟推送
Message<StkInFinishPushDTO> message = MessageBuilder.withPayload(stkInFinishPushDTO).setHeader("x-delay",300000).build();
outputInterface.headingStkinFinishOutput().send(message);
1
2
3
2
3
# yaml配置
# 入库定单完成接口推送
heading-stkin-finish-output:
producer:
delayedExchange: true # 生产者开启延时队列
heading-stkin-finish-input:
consumer:
delayedExchange: true #消费者开启延时队列
concurrency: 4 #并发数
max-concurrency: 8 #最大并发数
prefetch: 10 #预取数
auto-bind-dlq: true # 开启死信队列
republish-to-dlq: true # 死信错误详情
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 配置样例
spring:
cloud:
stream:
rabbit:
bindings:
# demo
heading-wms-notify-input:
consumer:
max-concurrency: 10
prefetch: 2
auto-bind-dlq: true
republish-to-dlq: true
# 门店资料消息消费
heading-wms-store-input:
consumer:
max-concurrency: 10
prefetch: 2
auto-bind-dlq: true
republish-to-dlq: true
# 商品资料消息
heading-wms-product-input:
consumer:
max-concurrency: 10
prefetch: 2
auto-bind-dlq: true
republish-to-dlq: true
# 入库定单完成接口推送
heading-stkin-finish-output:
producer:
delayedExchange: true
heading-stkin-finish-input:
consumer:
delayedExchange: true
concurrency: 4
max-concurrency: 8
prefetch: 10
auto-bind-dlq: true
republish-to-dlq: true
bindings:
# demo
heading-wms-notify-input:
destination: heading-wms-notify
binder: mq-default
group: demo-thirdparty
consumer:
concurrency: 2
# 门店资料消息
heading-wms-store-input:
destination: heading-wms-store
binder: mq-default
group: demo-thirdparty
consumer:
concurrency: 2
# 商品资料消息
heading-wms-product-input:
destination: heading-wms-product
binder: mq-default
group: demo-thirdparty
consumer:
concurrency: 2
# 入库定单完成接口推送
heading-stkin-finish-input:
destination: heading-stkin-finish
binder: mq-default
group: heading-stkin-finish-group
consumer:
concurrency: 2
# 单据通知消息
heading-wms-notify-output:
destination: heading-wms-notify
binder: mq-default
# 商品资料消息
heading-wms-product-output:
destination: heading-wms-product
binder: mq-default
# 门店资料消息
heading-wms-store-output:
destination: heading-wms-store
binder: mq-default
# 入库定单完成接口推送
heading-stkin-finish-output:
destination: heading-stkin-finish
binder: mq-default
defaultBinder: mq-default
binders:
mq-default:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 123456
virtual-host: /demo-thirdparty
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
上次更新: 2022/08/30, 13:59:56