webSocket+定时任务实时通知前台变化

    xiaoxiao2022-07-05  183

    后台发生变化时,可以通过实事件触发或者是定时任务,此处用的是定时任务 1、WebSocketConfig配置类:

    /** * @program: wealth_management * @description: WebSocketConfig配置类 * @author: SGQ * @create: 2019/04/23 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }

    2、WebSocketServer服务端,给前端调用:

    /** * @program: wealth.product.service * @description: WebSocketServer核心类-实时获取产品剩余额度 * @author: SGQ * @create: 2019/05/15 */ @ServerEndpoint("/product/webSocket/RemainAmountServer/{appointmentId}/{productCode}") @Component public class RemainAmountWebSocketServer { private static final Logger logger = LoggerFactory.getLogger(RemainAmountWebSocketServer.class); // 解决WebSocket不能注入的问题 private static ApplicationContext applicationContext; public static void setApplicationContext(ApplicationContext context) { applicationContext = context; } private ProductService productService; //静态变量,用来记录当前在线连接数 private static int onlineCount = 0; //ConcurrentHashMap的线程安全map,用来存放每个客户端对应的MyWebSocket对象。 private static ConcurrentHashMap<String, RemainAmountWebSocketServer> socketMap = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, TaskScheduler> taskMap = new ConcurrentHashMap<>(); //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; //接收appointmentId private String appointmentId = ""; /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("appointmentId") String appointmentId, @PathParam("productCode") String productCode) { this.session = session; socketMap.put(appointmentId, this); // 添加到Map中 addOnlineCount(); //在线数加1 this.appointmentId = appointmentId; logger.info("有新窗口开始监听:" + appointmentId + ",当前在线人数为" + getOnlineCount()); try { // 获得service实例 productService = applicationContext.getBean(ProductServiceImpl.class); ResponseCommonType responseCommonType = new ResponseCommonType(); responseCommonType.setSuccess(false); ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.initialize(); // 初始化 TaskScheduler task = new TaskScheduler(scheduler); task.startCron(new Runnable() { @Override public void run() { logger.info("系统轮询获取剩余额度中。。。"); RemainAmountDTO remainAmountDTO = productService.getProductRemainAmount(productCode); if (remainAmountDTO == null) { responseCommonType.setSuccess(false); responseCommonType.setResponseMessage(appointmentId + ":failed"); responseCommonType.setData("无此产品"); } else { responseCommonType.setSuccess(true); responseCommonType.setResponseMessage(appointmentId + ":success"); responseCommonType.setData(remainAmountDTO); } sendInfo(JSON.toJSONString(responseCommonType), appointmentId); } }, new CronTrigger("0/2 * * * * *")); // 在关闭连接时关闭定时任务 taskMap.put(appointmentId, task); } catch (Exception e) { logger.error("websocket 异常:" + e.toString(), e); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { socketMap.remove(appointmentId); //从map中删除 if (taskMap.get(appointmentId) != null) { taskMap.get(appointmentId).stopCron(); taskMap.remove(appointmentId); logger.info("轮询任务关闭:" + appointmentId); } subOnlineCount(); //在线数减1 logger.info("有一连接关闭!当前在线人数为" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { logger.info("收到来自窗口" + appointmentId + "的信息:" + message); //群发消息 for (Map.Entry<String, RemainAmountWebSocketServer> entry : socketMap.entrySet()) { try { entry.getValue().sendMessage(message); } catch (Exception e) { logger.error("收到客户端消息后调用的方法 error:" + e.toString(), e); } } } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { logger.error("发生错误:" + error.toString(), error); } /** * 实现服务器主动推送 */ public void sendMessage(String message) { try { this.session.getBasicRemote().sendText(message); } catch (Exception e) { logger.error("sendMessage error: " + e.toString(), e); } } /** * 群发自定义消息 */ public static void sendInfo(String message, String appointId) { try { //这里可以设定只推送给这个sid的,为null则全部推送 if (StringUtils.isEmpty(appointId)) { for (Map.Entry<String, RemainAmountWebSocketServer> entry : socketMap.entrySet()) { entry.getValue().sendMessage(message); } } else { socketMap.get(appointId).sendMessage(message); } } catch (Exception e) { logger.error("推送消息到窗口 error: " + e.toString(), e); } logger.info("推送消息到窗口" + appointId + ",推送内容:" + message); } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { RemainAmountWebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { RemainAmountWebSocketServer.onlineCount--; } }

    3、定时任务task:

    /** * @program: demo * @description: 定时任务处理类 * @author: SGQ * @create: 2019/04/25 */ public class TaskScheduler { private static final Logger logger = LoggerFactory.getLogger(TaskScheduler.class); private ThreadPoolTaskScheduler threadPoolTaskScheduler; /** * 在ScheduledFuture中有一个cancel可以停止定时任务。 */ private ScheduledFuture<?> future; public TaskScheduler(ThreadPoolTaskScheduler threadPoolTaskScheduler) { this.threadPoolTaskScheduler = threadPoolTaskScheduler; } /** * 启动任务 **/ public void startCron(Runnable runnable, CronTrigger cronTrigger) { logger.info("TaskScheduler#startCron()"); future = threadPoolTaskScheduler.schedule(runnable, cronTrigger); } /** * 停止任务 **/ public void stopCron() { logger.info("TaskScheduler#stopCron()"); if (future != null) { future.cancel(true); } } /** * 变更任务间隔,再次启动 **/ public void changeCron(Runnable runnable, CronTrigger cronTrigger) { logger.info("TaskScheduler#changeCron()"); stopCron();// 先停止,在开启. future = threadPoolTaskScheduler.schedule(runnable, cronTrigger); } }
    最新回复(0)