Commit 088becef by 赵剑炜

添加两个交换机主题

parent 721d2f18
//package com.junmp.jyzb.config.rabbitMQ;
//
//import com.junmp.jyzb.api.bean.dto.OrderDto;
//import com.junmp.jyzb.service.OrderMainService;
//import org.springframework.amqp.core.*;
//import org.springframework.beans.factory.annotation.Qualifier;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.List;
//
//@Configuration
//public class RabbitmqConfig {
// @Resource
// private OrderMainService orderMainService;
//
// //1.交换机
// @Bean("LogExchange")
// public Exchange bootExchange(){
// return ExchangeBuilder.directExchange("LogExchange").durable(true).build();
// }
//
// //2.队列
// @Bean("LogQueue")
// public Queue bootQueue(){
// return QueueBuilder.durable("LogQueue").build();
// }
// //3.队列和交换机的绑定关系(队列,交换机,rountingkey)
// @Bean
// public Binding bingQueueExchange(@Qualifier("LogQueue") Queue queue, @Qualifier("LogExchange")Exchange exchange){
// return BindingBuilder.bind(queue).to(exchange).with("log").noargs();
//
// }
//
// @Bean
// public DirectExchange OrderExchange(){
// return new DirectExchange("OrderExchange");
// }
//
// //2.队列
// @Bean
// public List<Queue> OrderQueue(){
// List<Queue> queues =new ArrayList<>();
// List<OrderDto> orderList = orderMainService.getOrder();
// for (OrderDto orderDto:orderList) {
// queues.add(new Queue(orderDto.getEndOrgName()));
// }
// return queues;
// }
//
// //3.队列和交换机的绑定关系(队列,交换机,rountingkey)
// @Bean
// public List<Binding> bindQueueExchange(List<Queue> queue, DirectExchange exchange){
// List<Binding> bindings=new ArrayList<>();
// List<OrderDto> orderList = orderMainService.getOrder();
// for(int i=0;i<Math.min(queue.size(),orderList.size());i++){
// bindings.add(BindingBuilder.bind(queue.get(i)).to(exchange).with(orderList.get(i).getEndOrgId()));
// }
// return bindings;
//
// }
//
//}
package com.junmp.jyzb.config.rabbitMQ;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.junmp.jyzb.entity.Cabinet;
import com.junmp.jyzb.entity.PubOrg;
import com.junmp.jyzb.service.CabinetService;
import com.junmp.jyzb.service.PubOrgService;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class TopicRabbitConfig {
private static final String EXCHANGE = "topicExchange";
private static final String ORG_ROUTING_KEY_PREFIX = "org.";
private static final String CABINET_ROUTING_KEY_PREFIX = "cabinet.";
@Resource
private PubOrgService pubOrgService;
@Resource
private CabinetService cabinetService;
// Inject the services using constructor injection
public TopicRabbitConfig() {
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE);
}
// Dynamic creation of queues and bindings for each organization and cabinet
@Bean
public List<Binding> createQueuesAndBindings(TopicExchange topicExchange) {
List<PubOrg> orgList = pubOrgService.list(new LambdaQueryWrapper<PubOrg>().eq(PubOrg::getStatusFlag, 1));
List<Cabinet> cabinetList = cabinetService.list();
List<Binding> bindings = new ArrayList<>();
for (PubOrg org : orgList) {
// Create a unique queue for each organization
Queue orgQueue = new Queue(org.getOrgId().toString());
// Bind the queue to the exchange with the routing key specific to the organization
Binding orgBinding = BindingBuilder.bind(orgQueue).to(topicExchange)
.with(ORG_ROUTING_KEY_PREFIX + org.getOrgId());
bindings.add(orgBinding);
}
for (Cabinet cabinet : cabinetList) {
// Create a unique queue for each cabinet
Queue cabinetQueue = new Queue(cabinet.getCabinetNum().toString());
// Bind the queue to the exchange with the routing key specific to the cabinet
Binding cabinetBinding = BindingBuilder.bind(cabinetQueue).to(topicExchange)
.with(CABINET_ROUTING_KEY_PREFIX + cabinet.getCabinetNum());
bindings.add(cabinetBinding);
}
return bindings;
}
}
......@@ -33,6 +33,17 @@ public class RabbitMQSendMsg {
rabbitTemplate.convertAndSend(exchangeName, name, jsonString);
}
// 发布消息到交换机,根据不同的主题和消息内容
public void publishOrgMessage(String orgId, String message) {
String routingKey = "org." + orgId;
rabbitTemplate.convertAndSend("topicExchange", routingKey, message);
}
public void publishCabinetMessage(String cabinetId, String message) {
String routingKey = "cabinet." + cabinetId;
rabbitTemplate.convertAndSend("topicExchange", routingKey, message);
}
//推送消息(广播式推送)
public void sendFanoutMsg(String exchangeName, List<String> names,Object msg){
//exchangeName交换机名称,name单警柜或本地仓库,msg发送的消息
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论