Redis实现消息队列(1)——通过Jedis发布和订阅消息

    xiaoxiao2022-07-06  201

     1.  引入jar

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>io.github.xiaoyudeguang</groupId> <artifactId>easy-core</artifactId> <version>1.2.23-RELEASE</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>

     2. JedisUtils工具类

    @EasyBean(todo = { "" }) public class JedisUtils implements IHandlerOnRefreshed, IHandlerOnChanged{ private static JedisPool jedisPool; @Value("${spring.redis.host}") private String reids_host; @Value("${spring.redis.port}") private int reids_port; @Value("${spring.redis.timeout}") private String timeout; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.database}") private int database; @Override public void doOnRefreshed(ApplicationContext context) throws Exception { jedisPool = new JedisPool(new JedisPoolConfig(), reids_host, reids_port, 100000, password, database); } @Override public void doOnChanged(ApplicationContext context) throws Exception { jedisPool.close(); } public static Long publish(String channel, Object message ) { return jedisPool.getResource().publish(channel, JSON.toJSONString(message)); } public static void subscribe(JedisPubSub jedisPubSub, String... channels) { jedisPool.getResource().subscribe(jedisPubSub, channels); } }

    3.发布消息

    JedisUtils.publish("自定义通道名称", "消息内容");

    4. 订阅消息

    @ApplicationRefreshedBean(todo = { "Redis消息消费者" }, order = 3) public class MsgSubscriber extends JedisPubSub implements IHandlerOnRefreshed { @Override public void doOnRefreshed(ApplicationContext context) throws Exception { ThreadManager.execute(new Thread(() -> { JedisUtils.subscribe(this, "自定义通道名称"); })); } @Override public void onMessage(String channel, String message) { LogUtils.info(this, channel + "订阅消息", message); } }

     

    最新回复(0)