Commit d64614ae by 赵剑炜

添加ws仓库设备

parent 05ca141f
package com.junmp.jyzb.api.util;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.util.List;
@Data
public class WsWarehouseReq {
@JsonProperty("deviceId")
private String deviceId;
@JsonProperty("onlineState")
private String onlineState;
@JsonProperty("orgId")
private String orgId;
@JsonProperty("warehouseId")
private String warehouseId;
@JsonProperty("boxList")
private List<warehouseDevs> warehouseDevs;
@Data
// 内部类表示box
public static class warehouseDevs {
@JsonProperty("devId")
private String devId;
@JsonProperty("devIP")
private int devIP;
@JsonProperty("devName")
private String devName;
@JsonProperty("devState")
private String devState;
@JsonProperty("devType")
private String devType;
}
}
\ No newline at end of file
package com.junmp.jyzb.cache;
import com.junmp.jyzb.api.constant.JYZBConstant;
import com.junmp.v2.cache.AbstractRedisCache;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class DeviceRedisCache extends AbstractRedisCache<String> {
public DeviceRedisCache(RedisTemplate<String, String> redisTemplate) {
super(redisTemplate);
}
@Override
public String getCommonKeyPrefix() {
return JYZBConstant.JYZB_CACHE_PREFIX;
}
}
...@@ -44,35 +44,10 @@ public class CabinetServiceImpl extends ServiceImpl<CabinetMapper, Cabinet> impl ...@@ -44,35 +44,10 @@ public class CabinetServiceImpl extends ServiceImpl<CabinetMapper, Cabinet> impl
private PubOrgService pubOrgService; private PubOrgService pubOrgService;
@Resource @Resource
private CabinetBoxService cabinetBoxService; private CabinetBoxService cabinetBoxService;
@Resource
private PolicemanService policemanService;
@Resource
private CabinetBoxPoliceService cabinetBoxPoliceService;
@Resource
private CabinetBoxPoliceMapper cabinetBoxPoliceMapper;
@Autowired @Autowired
private RabbitMQSendMsg MQ; private RabbitMQSendMsg MQ;
private List<Long> getAllOrgId(CabinetReq req, String includeLowerLevel) {
List<Long> allOrg = new ArrayList<>();
if (includeLowerLevel.equals("false")) {
allOrg.add(req.getOrgId());
} else if (includeLowerLevel.equals("true")) {
//查询某组织机构的本级及下级
allOrg = pubOrgService.getLowerOrg(req.getOrgId());
}
return allOrg;
}
private List<Cabinet> getAllCabinetByOrg( List<Long> allOrgId) {
List<Cabinet> allCabinet = new ArrayList<>();
allCabinet = cabinetMapper.getAllCabinetByOrgList(allOrgId);
return allCabinet;
}
/** /**
* 添加单警柜 * 添加单警柜
* *
......
...@@ -8,10 +8,18 @@ import com.alibaba.fastjson2.JSON; ...@@ -8,10 +8,18 @@ import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.junmp.iot.server.ws.WebSocketServerListener;
import com.junmp.iot.server.ws.impl.DefaultWebSocketServerProtocol;
import com.junmp.junmpProcess.redis.FlowRedisCache;
import com.junmp.jyzb.api.util.WsCabinetReq;
import com.junmp.jyzb.api.util.WsWarehouseReq;
import com.junmp.jyzb.cache.DeviceRedisCache;
import com.junmp.jyzb.cache.OnlineRedisCache; import com.junmp.jyzb.cache.OnlineRedisCache;
import com.junmp.jyzb.entity.Cabinet; import com.junmp.jyzb.entity.Cabinet;
import com.junmp.jyzb.entity.CabinetBox;
import com.junmp.jyzb.entity.Temp.OnlineState; import com.junmp.jyzb.entity.Temp.OnlineState;
import com.junmp.jyzb.service.CabinetService; import com.junmp.jyzb.service.CabinetService;
import com.junmp.jyzb.utils.DateTimeUtil;
import com.junmp.v2.ws.api.enums.ClientMsgType; import com.junmp.v2.ws.api.enums.ClientMsgType;
import com.junmp.v2.ws.api.enums.ServerMsgType; import com.junmp.v2.ws.api.enums.ServerMsgType;
import com.junmp.v2.ws.api.enums.SysMsgTypeEnum; import com.junmp.v2.ws.api.enums.SysMsgTypeEnum;
...@@ -22,6 +30,8 @@ import com.junmp.v2.ws.channel.WsSessionContainer; ...@@ -22,6 +30,8 @@ import com.junmp.v2.ws.channel.WsSessionContainer;
import com.junmp.v2.ws.message.WsMessageHandle; import com.junmp.v2.ws.message.WsMessageHandle;
import com.junmp.v2.ws.session.WsSessionHandle; import com.junmp.v2.ws.session.WsSessionHandle;
import io.jsonwebtoken.JwtException; import io.jsonwebtoken.JwtException;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -32,164 +42,61 @@ import javax.websocket.server.PathParam; ...@@ -32,164 +42,61 @@ import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.IOException; import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Component @Component
@ServerEndpoint("/warehouse/{data}") @Slf4j
public class WarehouseWebSocketServer { public class WarehouseWebSocketServer implements WebSocketServerListener {
private static final Logger log = LoggerFactory.getLogger(com.junmp.v2.ws.server.ScreenWebSocketServer.class); // private static final Logger log = LoggerFactory.getLogger(com.junmp.v2.ws.server.ScreenWebSocketServer.class);
@Resource @Resource
public CabinetService cabinetService; public CabinetService cabinetService;
@Resource @Resource
private OnlineRedisCache onlineRedisCache; private DeviceRedisCache redisCache;
public WarehouseWebSocketServer() { @Override
public String uri() {
return "/device/ws";
} }
@OnOpen @SneakyThrows
public void onOpen(Session session, @PathParam("data") String data) { @Override
String userId = null; public void onText(DefaultWebSocketServerProtocol protocol) {
try { try {
// 获取当前时间的时间戳
Instant now = Instant.now();
// 转换为long类型
long timestamp = now.toEpochMilli();
Cabinet cabinet=cabinetService.getOne(new LambdaQueryWrapper<Cabinet>()
.eq(Cabinet::getCabinetNum, data));
cabinet.setOnlineState(1);
cabinet.setUpdateTime(DateTime.now());
// cabinetService.updateById(cabinet);
// //将数据放入redis
// onlineRedisCache.addSessionValue(session.getId(),type,typeId,timestamp);
} catch (JwtException var15) {
log.error("建立连接异常>>" + var15.getMessage());
try {
session.close();
log.warn("因连接失败,会话自动关闭");
} catch (IOException var14) {
var14.printStackTrace();
log.error("会话关闭异常>>" + var14.getMessage());
}
}
WsSessionContainer wsContainer = new WsSessionContainer(session);
WsSession<WsSessionContainer> socketSession = new WsSession();
WsMsgDto replyMsg = new WsMsgDto();
boolean var13 = false;
try { String text = protocol.readText();
var13 = true; ObjectMapper objectMapper = new ObjectMapper();
replyMsg.setServerMsgType(ServerMsgType.REPLY_MSG_TYPE.getCode()); WsWarehouseReq deviceData = objectMapper.readValue(text, WsWarehouseReq.class);
// replyMsg.setToUserId(userId); deviceData.setOnlineState("true");
replyMsg.setData(session.getId()); String channel = protocol.requestMessage().getChannelId();
socketSession.setSessionId(session.getId()); String jsonData=objectMapper.writeValueAsString(deviceData);
socketSession.setUserId(userId); redisCache.put(channel,jsonData+":"+deviceData.getOrgId()+":"+deviceData.getWarehouseId());
socketSession.setWsApi(wsContainer);
socketSession.setToken(data);
// socketSession.setData();
socketSession.setConnectionTime(System.currentTimeMillis());
WsSessionHandle.addWsSession(socketSession);
var13 = false;
} finally {
if (var13) {
wsContainer.writeAndFlush(replyMsg);
WsCallbackApi wsCallbackApi = WsMessageHandle.getWsCallback(SysMsgTypeEnum.LISTENER_ON_OPEN.getCode());
if (ObjectUtil.isNotEmpty(wsCallbackApi)) {
wsCallbackApi.callback(SysMsgTypeEnum.LISTENER_ON_OPEN.getCode(), (Object)null, socketSession);
}
log.info("已建立连接"); protocol.response("true");
}
} }
catch (Exception e)
wsContainer.writeAndFlush(replyMsg); {
WsCallbackApi wsCallbackApi = WsMessageHandle.getWsCallback(SysMsgTypeEnum.LISTENER_ON_OPEN.getCode()); protocol.response("false");
if (ObjectUtil.isNotEmpty(wsCallbackApi)) {
wsCallbackApi.callback(SysMsgTypeEnum.LISTENER_ON_OPEN.getCode(), (Object)null, socketSession);
} }
log.info("已建立连接");
} }
@OnMessage @Override
public void onMessage(String message, Session channel) { public void onClose(DefaultWebSocketServerProtocol protocol) {
WsMsgDto wsMsgDto = (WsMsgDto) JSON.parseObject(message, WsMsgDto.class);
WsSession<WsSessionContainer> wsSession = WsSessionHandle.getSessionBySessionId(channel.getId());
if (ObjectUtil.isNotEmpty(wsSession) && ClientMsgType.USER_HEART.getCode().equals(wsMsgDto.getClientMsgType())) {
log.info(StrUtil.format("来自与客户端[{}]的心跳消息。", new Object[]{wsMsgDto.getFromUserId()}));
if (ObjectUtil.isNotEmpty(wsSession)) {
wsSession.setLastActiveTime(System.currentTimeMillis());
}
} else if (!ObjectUtil.isEmpty(wsMsgDto.getFromUserId())) { String channel= protocol.requestMessage().getChannelId();
if (ObjectUtil.isNotEmpty(wsSession)) { String key= redisCache.get("*:" + channel);
wsSession.setLastActiveTime(System.currentTimeMillis()); redisCache.remove(key);
WsCallbackApi callbackApi; WebSocketServerListener.super.onClose(protocol);
if (ClientMsgType.OPEN_AI_TYPE.getCode().equals(wsMsgDto.getClientMsgType())) {
log.info(StrUtil.format("来自与客户端[{}]调用OpenAi的消息。", new Object[]{wsMsgDto.getFromUserId()}));
callbackApi = (WsCallbackApi) SpringUtil.getBean(WsCallbackApi.class);
if (ObjectUtil.isNull(callbackApi)) {
log.warn("回调WsCallbackApi未注册.");
} else {
callbackApi.callback(wsMsgDto.getClientMsgType(), wsMsgDto.getData(), wsSession);
}
} else {
callbackApi = WsMessageHandle.getWsCallback(wsMsgDto.getClientMsgType());
if (ObjectUtil.isNotEmpty(callbackApi)) {
callbackApi.callback(wsMsgDto.getClientMsgType(), wsMsgDto, wsSession);
} else {
log.warn("未知的消息类型");
channel.getAsyncRemote().sendText("{\"serverMsgType\":\"未知的消息类型\"}");
}
}
}
}
} }
@OnClose @Override
public void onClose(Session session) { public void onBinary(DefaultWebSocketServerProtocol protocol) {
try { WebSocketServerListener.super.onBinary(protocol);
WsSession<WsSessionContainer> wsSession = WsSessionHandle.getSessionBySessionId(session.getId());
WsCallbackApi wsCallbackApi = WsMessageHandle.getWsCallback(SysMsgTypeEnum.LISTENER_ON_CLOSE.getCode());
if (ObjectUtil.isNotEmpty(wsSession) && ObjectUtil.isNotEmpty(wsCallbackApi)) {
OnlineState os=onlineRedisCache.getDataValue(wsSession.getSessionId());
String type= os.getType();
String typeId=os.getTypeId();
if (type.equals("cabinet"))//警柜断开连接
{
Cabinet cabinet=cabinetService.getOne(new LambdaQueryWrapper<Cabinet>()
.eq(Cabinet::getCabinetNum, typeId));
cabinet.setOnlineState(0);
cabinet.setUpdateTime(DateTime.now());
cabinetService.updateById(cabinet);
onlineRedisCache.remove(wsSession.getSessionId());
} else if (type.equals("warehouse")) {//仓库主机已连接
}
wsCallbackApi.callback(SysMsgTypeEnum.LISTENER_ON_CLOSE.getCode(), (Object)null, wsSession);
}
} finally {
WsSessionHandle.delById(session.getId());
log.warn("会话已关闭");
}
} }
@OnError
public void onError(Session session, Throwable error) {
WsSession<WsSessionContainer> wsSession = WsSessionHandle.getSessionBySessionId(session.getId());
WsCallbackApi wsCallbackApi = WsMessageHandle.getWsCallback(SysMsgTypeEnum.LISTENER_ON_ERROR.getCode());
if (ObjectUtil.isNotEmpty(wsCallbackApi)) {
wsCallbackApi.callback(SysMsgTypeEnum.LISTENER_ON_ERROR.getCode(), (Object)null, wsSession);
}
}
} }
\ No newline at end of file
package com.junmp.police.test;
import org.apache.tomcat.util.digester.DocumentProperties;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
public class Base64Test {
public static void main(String args[]){
// String str = "abc";
// byte[] val = str.getBytes(StandardCharsets.UTF_8);
// String base64EncodedChunk = Base64.getEncoder().encodeToString(val);
// System.out.println(base64EncodedChunk);
}
}
...@@ -10,7 +10,7 @@ thing: ...@@ -10,7 +10,7 @@ thing:
server: server:
websocket: websocket:
start: true start: true
port: 10040
# 配置第三方请求 # 配置第三方请求
junmp: junmp:
swagger: swagger:
......
...@@ -6,7 +6,11 @@ easy-es: ...@@ -6,7 +6,11 @@ easy-es:
address: 192.168.3.121:9200 address: 192.168.3.121:9200
username: username:
password: password:
thing:
server:
websocket:
start: true
port: 10030
# 配置第三方请求 # 配置第三方请求
junmp: junmp:
swagger: swagger:
......
...@@ -210,6 +210,7 @@ public class FlowTaskServiceImpl extends FlowServiceFactory implements IFlowTask ...@@ -210,6 +210,7 @@ public class FlowTaskServiceImpl extends FlowServiceFactory implements IFlowTask
} }
String objJson= redisCache.get(processInstanceId); String objJson= redisCache.get(processInstanceId);
redisCache.remove(processInstanceId); redisCache.remove(processInstanceId);
List<MessageSendReq> msgs= JSONObject.parseArray(objJson,MessageSendReq.class); List<MessageSendReq> msgs= JSONObject.parseArray(objJson,MessageSendReq.class);
if (CollectionUtil.isNotEmpty(msgs)){ if (CollectionUtil.isNotEmpty(msgs)){
msgs.forEach(msg-> msgs.forEach(msg->
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论