loading...
Springboot整合RabbitMQ的实现方式
Published in:2022-01-31 | category: RabbitMQ
Words: 1.2k | Reading time: 5min | reading:

fanout模式

整体核心

RF4Ei4.png

01 目标

使用springboot完成rabbitmq的消费模式-Fanout

REcFQx.png

02 在配置文件中配置RabbitMq的相关信息

1
2
3
4
5
6
7
8
9
# 配置rabbitmq服务
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
host: localhost
port: 5672

03 注解方式对RabbitMq进行管理

首先,我们能够确定上面的订单业务,需要至少三个消息队列来进行接收和处理消息(微信推送同理),那么我们需要先通过Springboot,将交换机(Exchange),队列(Queue)进行声明以及绑定。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Configuration
public class RabbitMqConfiguration {

// 1: 声明注册fanout模式的交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout_order_exchange",true,false);
}


// 2: 声明队列sms.fanout.queue email.fanout.queue duanxin.fanout.queue
@Bean
public Queue smsQueue(){
return new Queue("sms.fanout.queue",true);
}

@Bean
public Queue emailQueue(){
return new Queue("email.fanout.queue",true);
}

@Bean
public Queue duanxinQueue(){
return new Queue("duanxin.fanout.queue",true);
}

// 3: 完成绑定关系(队列和交换机完成绑定关系)
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}

@Bean
public Binding emailBinding(){
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}

@Bean
public Binding duanxinBinding(){
return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
}
}

绑定完成后,当程序运行,Springboot将会在RabbitMq中自动创建一个Fanout类型的交换机以及3个队列,并将这三个消息队列和fanout_order_exchange交换机进行绑定。

04 通过Springboot向RabbitMq发送消息

​ 我们可以通过RabbitTemplate类,向队列中发送消息,下来我们用代码进行模拟用户下单进行的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
public class OrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 派单方法
* 模拟用户下单
* @param userid
* @param productId
* @param num
*/
public void makeOrder(String userid , String productId , int num ){
// 1: 根据商品Id查询库存是否充足

// 2: 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功: " + orderId);
// 3: 通过消息队列完成消息的分发
// 参数1: 交换机 参数2: 路由key/queue队列名称 参数3: 消息内容
String exchangeName = "fanout_order_exchange";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
}
}

05 通过Springboot来接收Queue中的消息

首先我们先要对一个类用注解进行说明,说明该类用来监听哪些队列中的消息。注意:该类必须被spring容器进行管理,不然无法接收到RabbitMq中的消息。

1
2
3
4
5
6
7
8
9
10
// 这个注解说明该类用于监听哪个队列中的消息
@RabbitListener(queues = {"duanxin.fanout.queue"})
@Component
public class FanoutDuanxinConsumer {

@RabbitHandler
public void reviceMessage(String message){
System.out.println("duanxin fanout -- 接收到了订单信息是: ->" + message);
}
}

运行结果如下图所示:

REf7YF.png

direct模式

还是上面那个案例,Direct模式的具体实现方法,Springboot也有进行提供,比如Direct模式的交换机(DirectExchange),还有绑定的方法。其实现方式与Fanout模式类同。

具体实现如以下代码所示:

Direct模式配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Configuration
public class DirectRabbitMqConfiguration {

// 1: 声明注册direct模式的交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct_order_exchange",true,false);
}


// 2: 声明队列sms.direct.queue email.direct.queue duanxin.direct.queue
@Bean
public Queue smsDirectQueue(){
return new Queue("sms.direct.queue",true);
}

@Bean
public Queue emailDirectQueue(){
return new Queue("email.direct.queue",true);
}

@Bean
public Queue duanxinDirectQueue(){
return new Queue("duanxin.direct.queue",true);
}

// 3: 完成绑定关系(队列和交换机完成绑定关系)
@Bean
public Binding smsDirectBinding(){
return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");
}

@Bean
public Binding emailDirectBinding(){
return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");
}

@Bean
public Binding duanxinDirectBinding(){
return BindingBuilder.bind(duanxinDirectQueue()).to(directExchange()).with("duanxin");
}
}

向Direct模式交换机发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 派单方法(Direct)
* 模拟用户下单
* @param userid
* @param productId
* @param num
*/
public void makeDirectOrder(String userid , String productId , int num ,String routingKey){
// 1: 根据商品Id查询库存是否充足

// 2: 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功: " + orderId);
// 3: 通过消息队列完成消息的分发
// 参数1: 交换机 参数2: 路由key/queue队列名称 参数3: 消息内容
String exchangeName = "direct_order_exchange";
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
}

Direct模式交换机接收消息

1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = {"duanxin.direct.queue"})
public class DirectDuanxinConsumer {

@RabbitHandler
public void reviceMessage(String message){
System.out.println("Duanxin direct -- 接收到了订单信息是: ->" + message);
}
}

测试类与处理结果

测试类

1
2
3
4
5
@Test
public void testDirect(){
orderService.makeDirectOrder("1","1",10,"sms");
orderService.makeDirectOrder("1","1",10,"email");
}

处理结果

RVRcIU.png

topic模式

交换机和队列的绑定可以使用注解的方式直接进行绑定,我们现在用topic模式来进行演示这一方式的绑定。

使用注解方式绑定交换机和队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

/**
#:代表可以存在0个,1个,多个
*:代表必须存在1个
**/
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "sms.topic.queue",durable = "true" ,autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.sms.#"
))
public class TopicSMSConsumer {

@RabbitHandler
public void reviceMessage(String message){
System.out.println("SMS topic -- 接收到了订单信息是: ->" + message);
}
}
Prev:
Next:
Oracle常用命令
catalog
catalog