Commit 8cae604f by 赵剑炜

增加消息队列

parent 67d1bc33
......@@ -16,7 +16,9 @@
<module name="jyzb-api" />
</profile>
</annotationProcessing>
<bytecodeTargetLevel target="9" />
<bytecodeTargetLevel target="9">
<module name="jyzb-mq" target="1.8" />
</bytecodeTargetLevel>
</component>
<component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_OVERRIDE">
......
......@@ -17,6 +17,7 @@ public class UpdateOrgReq {
/**
* 组织机构列表
*/
@NotBlank(message = "组织机构不能为空", groups = {ValidationApi.updateStatus.class})
private List<String> orgIdList;
......
......@@ -24,6 +24,16 @@
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
......@@ -70,5 +80,9 @@
<groupId>com.junmp.jyzb</groupId>
<artifactId>jyzb-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
</dependency>
</dependencies>
</project>
package com.junmp.jyzb.config.rabbitMQ;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
return new Queue("Test1Queue",true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("Test1Exchange");
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("Test1Routing");
}
}
\ No newline at end of file
package com.junmp.jyzb.config.rabbitMQ;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ProductRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("Test1Queue",true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("Test1Exchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("Test1Routing");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
......@@ -6,16 +6,14 @@ import com.junmp.jyzb.api.bean.query.InventorySumReq;
import com.junmp.jyzb.entity.InventorySummary;
import com.junmp.jyzb.service.InventoryService;
import com.junmp.jyzb.service.InventorySummaryService;
import com.junmp.jyzb.service.MQProductService;
import com.junmp.jyzb.utils.ResponseResult;
import com.junmp.v2.common.bean.response.ApiRes;
import com.junmp.v2.db.api.page.PageResult;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.Map;
......@@ -28,8 +26,18 @@ public class InventoryController {
@Resource
public InventoryService inventoryService;
@Resource
public MQProductService mqProductService;
@Resource
public InventorySummaryService inventorySummaryService;
@GetMapping("/testMQ")
@ApiOperation("根据组织机构查询库存信息")
public ApiRes<Boolean> testMQ( ) {
String aa="";
return ApiRes.success(mqProductService.sendMessage());
}
@PostMapping("/GetEquipmentInfo")
@ApiOperation("根据组织机构查询库存信息")
public ApiRes<PageResult<InventorySummary>> getEquipmentInfo(@RequestBody InventorySumReq req) {
......
......@@ -50,7 +50,7 @@ public class PubOrgController {
}
@PostMapping("/ChangeOrgState")
@ApiOperation("改变组织机构状态信息")
public ApiRes<Boolean> changeOrgState(@RequestBody @Validated(ValidationApi.edit.class) UpdateOrgReq req) {
public ApiRes<Boolean> changeOrgState(@RequestBody @Validated(ValidationApi.updateStatus.class) UpdateOrgReq req) {
return ApiRes.success(pubOrgService.ChangeState(req));
}
@PostMapping("/GetLowOrg")
......
package com.junmp.jyzb.rabbitmq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* 创建消息接收监听�
* */
@Component
@RabbitListener(queues = "Test1Exchange") //监听的队列名�TestDirectQueue
public class DirectReceiver {
@RabbitHandler
private void modelConvert(String content, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
int retryCount = 0; // 初始化重试次数
boolean isRetry = false;
try {
// 进行消息处理和条件判断
if (content.equals("1")) {
System.out.println("DirectReceiver消费者收到消息 : " + content);
// 手动确认消息
channel.basicAck(deliveryTag, false);
} else {
// 不满足条件,拒绝消息并将其重新放回队列
}
} catch (Exception e) {
// 处理消息过程中发生异常,拒绝消息并将其重新放回队列
try {
channel.basicNack(deliveryTag, false, true);
isRetry = true;
} catch (IOException ex) {
// 处理异常
}
} finally {
if (isRetry) {
retryCount++;
// 判断重试次数是否达到限制
if (retryCount > 2) {
// 超过最大重试次数,将消息发送到死信队列
// 你需要创建一个死信队列,并在此处使用 channel.basicPublish() 将消息发送到死信队列
}
}
}
}
}
package com.junmp.jyzb.service;
import com.junmp.jyzb.api.bean.query.InventorySumReq;
import com.junmp.jyzb.entity.InventorySummary;
import com.junmp.v2.db.api.page.PageResult;
public interface MQProductService {
Boolean sendMessage();
}
package com.junmp.jyzb.service.impl;
import com.junmp.jyzb.api.bean.query.InventorySumReq;
import com.junmp.jyzb.service.LogSummaryService;
import com.junmp.jyzb.service.MQProductService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.PostMapping;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Service
public class MQProductServicelmpl implements MQProductService {
@Autowired
RabbitTemplate rabbitTemplate;
//通过converAndSend方法发送消息
@Override
public Boolean sendMessage() {
//生成当前时间
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
try {
//将记录当前时间的信息发送到消息队列
rabbitTemplate.convertAndSend("Test1Exchange", "Test1Routing", "消息发送的时间为:" + createTime);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}
......@@ -330,43 +330,24 @@ public class PubOrgServiceImpl extends ServiceImpl<PubOrgMapper, PubOrg> implem
}
public List<OrgTreeDto> getOrgList(QueryOrgReq req) {
String selfCode = "1369509498032808905";
List<OrgTreeDto> OrgDTOList=new ArrayList<>();
List<OrgTreeDto> orgDTOList = new ArrayList<>();
LambdaQueryWrapper<PubOrg> wrapper = new LambdaQueryWrapper<>();
String setNewCode;
List<PubOrg> orgList = new ArrayList<>();
if (ObjectUtil.isNotEmpty(req.getOrgCode())) {
setNewCode = req.getOrgCode().replaceAll("0+$", "");
} else {
setNewCode = "";
}
wrapper.eq(ObjectUtil.isNotEmpty(req.getIsDepartment()), PubOrg::getIsDepartment, req.getIsDepartment());
wrapper.eq( PubOrg::getStatusFlag, 1);
wrapper.eq(PubOrg::getStatusFlag, 1);
wrapper.likeRight(ObjectUtil.isNotEmpty(req.getOrgCode()), PubOrg::getOrgCode, setNewCode);
orgList = this.list(wrapper);
List<PubOrg> orgList = this.list(wrapper);
// 创建一个 Map 来存储机构 ID 和对应的 DTO 对象
Map<String, OrgTreeDto> orgDtoMap = new HashMap<>();
// 创建一个 Map 来存储父类的 parentid 和对应的父类节点
Map<String, OrgTreeDto> parentDtoMap = new HashMap<>();
if (ObjectUtil.isNotEmpty(req.getOrgCode()) && !req.getOrgCode().equals("330000000000")) {
LambdaQueryWrapper<PubOrg> wrapper2 = new LambdaQueryWrapper<>();
if (ObjectUtil.isNotEmpty(req.getOrgCode())) {
selfCode = req.getOrgCode();
}
wrapper2.eq(ObjectUtil.isNotEmpty(req.getOrgCode()), PubOrg::getOrgCode, selfCode);
PubOrg orgA= this.getOne(wrapper2);
String id = this.getOne(wrapper2).getOrgId().toString();
selfCode = id;
PubOrg org = this.getById(Long.parseLong(id));
OrgTreeDto orgDTO = new OrgTreeDto();
BeanPlusUtil.copyProperties(org, orgDTO);
orgDTO.setName(org.getDName());
orgDTO.setCode(org.getOrgCode());
orgDTO.setType(org.getOrgType());
orgDtoMap.put(org.getOrgId().toString(), orgDTO);
}
Map<String, List<OrgTreeDto>> parentDtoMap = new HashMap<>();
// 第一轮遍历:创建 OrgTreeDto 对象并将其加入到 Map 中
for (PubOrg org : orgList) {
......@@ -377,34 +358,42 @@ public class PubOrgServiceImpl extends ServiceImpl<PubOrgMapper, PubOrg> implem
orgDTO.setType(org.getOrgType());
orgDtoMap.put(org.getOrgId().toString(), orgDTO);
// 如果 parentid 是传进来的值,则将其作为顶级节点添加到 OrgDTOList 中
// 将根节点添加到 OrgDTOList 中
if (org.getOrgId().toString().equals(selfCode)) {
OrgDTOList.add(orgDTO);
orgDTOList.add(orgDTO);
}
// 将当前节点添加到 parentDtoMap 中,以便后续节点连接
String parentId = org.getOrgParentId().toString();
if (!parentDtoMap.containsKey(parentId)) {
parentDtoMap.put(parentId, new ArrayList<>());
}
parentDtoMap.get(parentId).add(orgDTO);
}
// 第二轮遍历:将子节点连接到父节点或添加为顶级节点
boolean isStartAddingChildren = false;
// 第二轮遍历:将子节点连接到父节点
for (PubOrg org : orgList) {
String parentId = org.getOrgParentId().toString();
OrgTreeDto orgDTO = orgDtoMap.get(org.getOrgId().toString());
// 如果找到传入的ID对应的节点,则开始添加子节点
if (org.getOrgId().toString().equals(selfCode)) {
isStartAddingChildren = true;
addChildToOrgDTO(orgDTO, orgDtoMap, parentDtoMap);
}
}
// 如果 isStartAddingChildren 为 true,并且 parentDtoMap 中有对应的父类节点,则将节点连接到父节点
if (isStartAddingChildren && parentDtoMap.containsKey(parentId)) {
OrgTreeDto parentDto = parentDtoMap.get(parentId);
parentDto.addChild(orgDTO);
}
return orgDTOList;
}
// 将当前节点添加到 parentDtoMap 中,以便后续节点连接
parentDtoMap.put(org.getOrgId().toString(), orgDTO);
private void addChildToOrgDTO(OrgTreeDto parentOrgDTO, Map<String, OrgTreeDto> orgDtoMap, Map<String, List<OrgTreeDto>> parentDtoMap) {
String parentId = parentOrgDTO.getOrgId().toString();
List<OrgTreeDto> children = parentDtoMap.get(parentId);
if (children != null) {
for (OrgTreeDto child : children) {
parentOrgDTO.addChild(child);
addChildToOrgDTO(child, orgDtoMap, parentDtoMap);
}
}
return OrgDTOList;
}
private LambdaQueryWrapper<PubOrg> createWrapper(QueryOrgReq req) {
......@@ -415,7 +404,6 @@ public class PubOrgServiceImpl extends ServiceImpl<PubOrgMapper, PubOrg> implem
wrapper.eq(ObjectUtil.isNotEmpty(req.getIsDepartment()), PubOrg::getIsDepartment, req.getIsDepartment());
//根据业务编码查询
wrapper.eq(ObjectUtil.isNotEmpty(req.getLevel()), PubOrg::getLevelFlag, req.getLevel());
wrapper.eq(ObjectUtil.isNotEmpty(req.getOrgId()), PubOrg::getOrgId,req.getOrgId());
wrapper.eq(ObjectUtil.isNotEmpty(req.getParentId()), PubOrg::getOrgParentId,Long.parseLong(req.getParentId()) );
wrapper.eq(ObjectUtil.isNotEmpty(req.getOrgCode()), PubOrg::getOrgCode, req.getOrgCode());
......
package com.junmp.jyzb.boot.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
......@@ -14,21 +16,29 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
//@Configuration
//@EnableSwagger2
public class Swagger2Config {
// @Bean
// public Docket api() {
// return new Docket(DocumentationType.SWAGGER_2)
// .select()
// //.apis(RequestHandlerSelectors.basePackage("com.junmp.junmpProcess.controller"))
// .paths(PathSelectors.any())
// .build()
// .apiInfo(apiInfo());
// }
//
// private ApiInfo apiInfo() {
// return new ApiInfoBuilder()
// .title("API文档")
// .description("API文档")
// .version("1.0")
// .build();
// }
@Bean
public Docket api() {
return new Docket(DocumentationType.SWAGGER_2)
.select()
//.apis(RequestHandlerSelectors.basePackage("com.junmp.junmpProcess.controller"))
.paths(PathSelectors.any())
.build()
.apiInfo(apiInfo());
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("API文档")
.description("API文档")
.version("1.0")
.build();
}
// 添加下面这个Bean,使用Jackson的命名策略
@Bean
public ObjectMapper jacksonObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); // 使用下划线命名策略,即属性名转为小写,并使用下划线分隔
return objectMapper;
}
}
......@@ -11,6 +11,13 @@ spring:
allow-bean-definition-overriding: true
application:
name: jyzb-app
rabbitmq:
host: 192.168.3.188
port: 5672
username: root
password: 123456
#虚拟host 可以不设置,使用server默认host
virtual-host: /
profiles:
#@spring.active@
active: local
......
package com.junmp.jyzb.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 创建消息接收监听类
* */
@Component
@RabbitListener(queues = "TestDirectQueue") //监听的队列名称 TestDirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Object testMessage) {
System.out.println("DirectReceiver消费者收到消息 : " + testMessage);
<<<<<<< HEAD
=======
>>>>>>> develop
}
}
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论