fanout模式
整体核心
01 目标
使用springboot完成rabbitmq的消费模式-Fanout
02 在配置文件中配置RabbitMq的相关信息
1 2 3 4 5 6 7 8 9
| spring: rabbitmq: username: guest password: guest virtual-host: / host: localhost port: 5672
|
03 注解方式对RabbitMq进行管理
首先,我们能够确定上面的订单业务,需要至少三个消息队列来进行接收和处理消息(微信推送同理),那么我们需要先通过Springboot,将交换机(Exchange),队列(Queue)进行声明以及绑定。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Configuration public class RabbitMqConfiguration {
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanout_order_exchange",true,false); }
@Bean public Queue smsQueue(){ return new Queue("sms.fanout.queue",true); }
@Bean public Queue emailQueue(){ return new Queue("email.fanout.queue",true); }
@Bean public Queue duanxinQueue(){ return new Queue("duanxin.fanout.queue",true); }
@Bean public Binding smsBinding(){ return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); }
@Bean public Binding emailBinding(){ return BindingBuilder.bind(emailQueue()).to(fanoutExchange()); }
@Bean public Binding duanxinBinding(){ return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange()); } }
|
绑定完成后,当程序运行,Springboot将会在RabbitMq中自动创建一个Fanout类型的交换机以及3个队列,并将这三个消息队列和fanout_order_exchange交换机进行绑定。
04 通过Springboot向RabbitMq发送消息
我们可以通过RabbitTemplate类,向队列中发送消息,下来我们用代码进行模拟用户下单进行的操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Service public class OrderService {
@Autowired private RabbitTemplate rabbitTemplate;
public void makeOrder(String userid , String productId , int num ){
String orderId = UUID.randomUUID().toString(); System.out.println("订单生产成功: " + orderId); String exchangeName = "fanout_order_exchange"; String routingKey = ""; rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId); } }
|
05 通过Springboot来接收Queue中的消息
首先我们先要对一个类用注解进行说明,说明该类用来监听哪些队列中的消息。注意:该类必须被spring容器进行管理,不然无法接收到RabbitMq中的消息。
1 2 3 4 5 6 7 8 9 10
| @RabbitListener(queues = {"duanxin.fanout.queue"}) @Component public class FanoutDuanxinConsumer {
@RabbitHandler public void reviceMessage(String message){ System.out.println("duanxin fanout -- 接收到了订单信息是: ->" + message); } }
|
运行结果如下图所示:
direct模式
还是上面那个案例,Direct模式的具体实现方法,Springboot也有进行提供,比如Direct模式的交换机(DirectExchange),还有绑定的方法。其实现方式与Fanout模式类同。
具体实现如以下代码所示:
Direct模式配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Configuration public class DirectRabbitMqConfiguration {
@Bean public DirectExchange directExchange(){ return new DirectExchange("direct_order_exchange",true,false); }
@Bean public Queue smsDirectQueue(){ return new Queue("sms.direct.queue",true); }
@Bean public Queue emailDirectQueue(){ return new Queue("email.direct.queue",true); }
@Bean public Queue duanxinDirectQueue(){ return new Queue("duanxin.direct.queue",true); }
@Bean public Binding smsDirectBinding(){ return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms"); }
@Bean public Binding emailDirectBinding(){ return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email"); }
@Bean public Binding duanxinDirectBinding(){ return BindingBuilder.bind(duanxinDirectQueue()).to(directExchange()).with("duanxin"); } }
|
向Direct模式交换机发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
public void makeDirectOrder(String userid , String productId , int num ,String routingKey){
String orderId = UUID.randomUUID().toString(); System.out.println("订单生产成功: " + orderId); String exchangeName = "direct_order_exchange"; rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId); }
|
Direct模式交换机接收消息
1 2 3 4 5 6 7 8 9
| @Component @RabbitListener(queues = {"duanxin.direct.queue"}) public class DirectDuanxinConsumer {
@RabbitHandler public void reviceMessage(String message){ System.out.println("Duanxin direct -- 接收到了订单信息是: ->" + message); } }
|
测试类与处理结果
测试类
1 2 3 4 5
| @Test public void testDirect(){ orderService.makeDirectOrder("1","1",10,"sms"); orderService.makeDirectOrder("1","1",10,"email"); }
|
处理结果
topic模式
交换机和队列的绑定可以使用注解的方式直接进行绑定,我们现在用topic模式来进行演示这一方式的绑定。
使用注解方式绑定交换机和队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
@Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "sms.topic.queue",durable = "true" ,autoDelete = "false"), exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC), key = "#.sms.#" )) public class TopicSMSConsumer {
@RabbitHandler public void reviceMessage(String message){ System.out.println("SMS topic -- 接收到了订单信息是: ->" + message); } }
|