现今的互联网系统,有两种NoSQL使用最为广泛,那就是Redis和MongoDB。这一章将介绍Redis和SpringBoot的结合。
Redis是一种运行在内存的数据库,支持7种数据类型的存储,运行速度很快,性能十分高效,大幅提高网站查询访问。
要使用Redis,首先加入Redis依赖,通过配置文件applicatiuon-properties进行配置:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <!--不依赖Redis的异步客户端lettuce --> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <!--引入Redis的客户端驱动jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>默认情况下,spring-boot-starter-data-redis会依赖Lettuce的Redis客户端驱动,但在一般的项目中,使用Jedis,所以在代码中使用<extensions>元素将其依赖排除。
Redis是一种键值数据库,是以字符串类型为中心的,它能够支持多种数据类型,包括字符串、散列、链表、集合、有序集合、基数和地理位置等。重点讨论字符串、散列、链表、集合和有序集合的使用。(使用率较高)
这里只讨论Spring推荐使用的类库Jedis的使用。Spring提供了一个RedisConnectionFactory接口,通过它可以生成一个RedisConnection接口对象,而它是对Redis底层接口的封装。
下面开发一个简单的RedisConnectionFactory接口对象:
/**** imports ****/ //@Configuration public class RedisConfig { private RedisConnectionFactory connectionFactory = null; @Bean(name = "redisConnectionFactory") public RedisConnectionFactory initConnectionFactory() { if (this.connectionFactory != null) { return this.connectionFactory; } JedisPoolConfig poolConfig = new JedisPoolConfig(); // 最大空闲数 poolConfig.setMaxIdle(50); // 最大连接数 poolConfig.setMaxTotal(100); // 最大等待毫秒数 poolConfig.setMaxWaitMillis(2000); // 创建Jedis连接工厂 JedisConnectionFactory connectionFactory = new JedisConnectionFactory(poolConfig); // 配置Redis连接服务器 RedisStandaloneConfiguration rsc = connectionFactory.getStandaloneConfiguration(); rsc.setHostName("192.168.10.128"); rsc.setPort(6379); rsc.setPassword(RedisPassword.of("123456")); this.connectionFactory = connectionFactory; return connectionFactory; } }这里通过一个连接池的配置创建了RedisConnectionFactory,通过它就能够创建RedisConnection接口对象。但在使用一条连接时,要先从RedisConnectionFactory工厂获取,然后在使用完成后还要自己关闭,Spring为了进一步简化开发,提供了RedisTemplate。
先来创建RedisTemplate,代码如下(接上面RedisConnectionFactory接口代码):
@Bean(name="redisTemplate") public RedisTemplate<Object, Object> initRedisTemplate() { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(initConnectionFactory()); return redisTemplate; }然后测试它,代码如下:
/**** imports ****/ public class Chapter7Main { public static void main(String[] args) { AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(RedisConfig.class); RedisTemplate redisTemplate = ctx.getBean(RedisTemplate.class); redisTemplate.opsForValue().set("key1", "value1"); redisTemplate.opsForHash().put("hash", "field", "hvalue"); }这里使用Java配置文件RedisConfig来创建Spring IoC容器,然后从中获取RedisTemplate对象,接着设置一个键为"key1",值为“value1”的键值对。
在这里,需要清楚的是,Redis是一种基于字符串存储的NoSQL,而Java是基于对象的语言,对象是无法存储到Redis中,不过Java提供了序列化机制,只要类实现了java.io.Serializable接口,就代表类的对象能够进行序列化,通过将类对象进行序列化就能够得到二进制字符串,这样Redis就可以将这些类对象以字符串进行存储。Java也可以将那些二进制字符串通过反序列化转为对象。对于序列化器,Spring提供了RedisSerializable接口,它有两个方法,一个是serialize,能够将序列化对象转换为二进制字符串;另一个是deserialize,能够通过反序列化将二进制字符串转化为Java对象。Spring会默认使用JdkSerializationRedisSerializer对对象进行序列化和反序列化。
我们希望RedisTemplate可以将Redis的键以普通字符串保存,所以将上面RedisTemplate代码进行修改:
@Bean(name="redisTemplate") public RedisTemplate<Object, Object> initRedisTemplate() { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(initConnectionFactory()); //RedisTemplate会自动初始化StringRedisSerializer,所以这里直接获取 RedisSerializer<String> stringRedisSerializer = redisTemplate.getStringSerializer(); //设置字符串序列化器,这样Spring就会把Redis的key当作字符串处理 redisTemplate.setKeySerializer(stringRedisSerializer); redisTemplate.setHashKeySerializer(stringRedisSerializer); redisTemplate.setHashValueSerializer(stringRedisSerializer); return redisTemplate; }我们通过主动将Redis的键和散列结构的field和value均采用了字符串序列化器,这样把它们转换出来时就会采用字符串了。 hash数据类型也全部采用了字符串的形式,因为设置了StringRedisSerializer序列化器操作它们。
值得注意的是这两行代码:
redisTemplate.opsForValue().set("key1", "value1"); redisTemplate.opsForHash().put("hash", "field", "hvalue");首先在操作key1时,redisTemplate会先从连接工厂(RedisConnectionFactory)中获取一个连接,然后执行对应的Redis命令,再关闭这条连接。所以我们可以看到这个过程是两条连接的操作,这样显然浪费资源,为了克服这个问题,Spring为我们提供了RedisCallback和SessionCallback两个接口。在此之前,先来了解一下Spring对Redis数据类型的封装。
上面已经说过Redis支持7种数据类型,为此Spring为每一种数据结构的操作都提供了对应的操作接口,如下图:
它们都可以通过RedisTemplate得到,得到的方法也很简单,代码如下:
//获取地理位置操作接口 redisTemplate.opsForGeo(); //获取散列操作接口 redisTemplate.opsForHash(); //获取基数操作接口 redisTemplate.opsForHyperLogLog(); //获取列表(链表)操作接口 redisTemplate.opsForList(); //获取集合操作接口 redisTemplate.opsForSet(); //获取字符串操作接口 redisTemplate.opsForValue(); //获取有序集合操作接口 redisTemplate.opsForZSet();这样就可以通过各类的操作接口来操作不同的数据类型。下面来看如何通过Spring操作Redis的各种数据类型。
上面我们说到SessionCallback和RedisCallback接口,它们的作用是让RedisTemplate进行回调,通过它们可以在同一条连接下执行多个Redis命令。其中SessionCallback提供了良好的封装,对于开发者比较友好,优先使用它。下面是SessionCallbak接口实现代码:
public static void useSessionCallback(RedisTemplate redisTemplate) { redisTemplate.execute(new SessionCallback() { @Override public Object execute(RedisOperations ro) throws DataAccessException { ro.opsForValue().set("key1", "value1"); ro.opsForHash().put("hash", "field", "hvalue"); return null; } }); }上述代码使用了匿名内部类,也可以用Java8或者更高版本的Lambda表岛是改写:
public static void useSessionCallback(RedisTemplate redisTemplate) { redisTemplate.execute((RedisOperations ro) -> { ro.opsForValue().set("key1", "value1"); ro.opsForHash().put("hash", "field", "hvalue"); return null; }); }这样就实现了RedisTemplate使用同一条Redis连接进行回调,从而在同一条连接下执行多个方法。
首先,在配置文件application.properties中添加如下代码:
#配置连接池属性 spring.redis.jedis.pool.min-idle=5 spring.redis.jedis.pool.max-active=10 spring.redis.jedis.pool.max-idle=10 spring.redis.jedis.pool.max-wait=2000 #配置服务器属性 spring.redis.port=6379 spring.redis.host=192.168.11.128 spring.redis.password=123456这里配置连接池和服务器属性,用以连接Redis服务器,SpringBoot的自动装配机制就会读取这些配置来生成有关Redis的操作对象,它会自动生成RedisConnectionFactory\RedisTemplate\StringRedisTemplate等常用的Redis对象。RedisTemplate会默认使用JdkSerializationRedisSerializer进行序列化键值,这样便能够存储到Redis服务器中。下面我们在Spring Boot的启动文件中修改RedisTemplate的序列化器,代码如下:
@SpringBootApplication(scanBasePackages = "com.springboot.chapter7") //指定扫描的MyBatis Mapper @MapperScan(basePackages = "com.springboot.chapter7", annotationClass = Repository.class) //使用注解驱动缓存机制 @EnableCaching public class Chapter7Application { // 设置RedisTemplate的序列化器 private void initRedisTemplate() { RedisSerializer stringSerializer = redisTemplate.getStringSerializer(); redisTemplate.setKeySerializer(stringSerializer); redisTemplate.setHashKeySerializer(stringSerializer); } // RedisTemplate @Autowired private RedisTemplate redisTemplate = null; @Autowired private RedisConnectionFactory connectionFactory = null; // 自定义初始化方法 @PostConstruct public void init() { initRedisTemplate(); } ... }首先通过@Autired注入由Spring Boot根据配置生成的RedisTemplate对象,然后利用Spring Bean的生命周期的特性使用注解@PostConstruct自定义初始化方法。在这个方法里,把RedisTemplate中的键序列化器修改为StringRedisSerializer。这样我们在Redis服务器上得到的键和散列的field都可以以字符串存储了。
下面详细操作Redis数据类型:
(1)字符串和散列
/**** imports ****/ @Controller @RequestMapping("/redis") public class RedisController { @Autowired private RedisTemplate redisTemplate = null; @Autowired private StringRedisTemplate stringRedisTemplate = null; @RequestMapping("/stringAndHash") @ResponseBody public Map<String, Object> testStringAndHash() { redisTemplate.opsForValue().set("key1", "value1"); // 注意这里使用了JDK的序列化器,所以Redis保存的时候不是整数,不能运算 redisTemplate.opsForValue().set("int_key", "1"); stringRedisTemplate.opsForValue().set("int", "1"); // 使用运算 stringRedisTemplate.opsForValue().increment("int", 1); // 获取底层Jedis连接 Jedis jedis = (Jedis) stringRedisTemplate.getConnectionFactory().getConnection().getNativeConnection(); // 减一操作,这个命令RedisTemplate不支持,所以笔者先获取底层的连接再操作 jedis.decr("int"); Map<String, String> hash = new HashMap<String, String>(); hash.put("field1", "value1"); hash.put("field2", "value2"); // 存入一个散列数据类型 stringRedisTemplate.opsForHash().putAll("hash", hash); // 新增一个字段 stringRedisTemplate.opsForHash().put("hash", "field3", "value3"); // 绑定散列操作的key,这样可以连续对同一个散列数据类型进行操作 BoundHashOperations hashOps = stringRedisTemplate.boundHashOps("hash"); // 删除两个个字段 hashOps.delete("field1", "field2"); // 新增一个字段 hashOps.put("filed4", "value5"); Map<String, Object> map = new HashMap<String, Object>(); map.put("success", true); return map; }(2)列表
在Redis里列表是一种链表结构,查询性能不高,但增删结点的性能高。
@RequestMapping("/list") @ResponseBody public Map<String, Object> testList() { // 插入两个列表,注意它们再链表的顺序 // 链表从左到右顺序为v10,v8,v6,v4,v2 stringRedisTemplate.opsForList().leftPushAll("list1", "v2", "v4", "v6", "v8", "v10"); // 链表从左到右顺序为v1,v2,v3,v4,v5,v6 stringRedisTemplate.opsForList().rightPushAll("list2", "v1", "v2", "v3", "v4", "v5", "v6"); // 绑定list2链表操作 BoundListOperations listOps = stringRedisTemplate.boundListOps("list2"); // 从右边弹出一个成员 Object result1 = listOps.rightPop(); // 获取定位元素,Redis从0开始计算,这里值为v2 Object result2 = listOps.index(1); // 从左边插入链表 listOps.leftPush("v0"); // 求链表长度 Long size = listOps.size(); // 求链表下标区间成员,整个链表下标范围为0到size-1,这里不取最后一个元素 List elements = listOps.range(0, size - 2); Map<String, Object> map = new HashMap<String, Object>(); map.put("success", true); return map; }(3)集合
在Redis里集合里的元素不允许重复,对于两个以上的集合,Redis还提供了交集、并集和差集的运算。
@RequestMapping("/set") @ResponseBody public Map<String, Object> testSet() { // 请注意:这里v1重复2次,由于集合不允许重复,所以只是插入5个成员到集合中 stringRedisTemplate.opsForSet().add("set1", "v1", "v1", "v2", "v3", "v4", "v5"); stringRedisTemplate.opsForSet().add("set2", "v2", "v4", "v6", "v8"); // 绑定set1集合操作 BoundSetOperations setOps = stringRedisTemplate.boundSetOps("set1"); // 增加两个元素 setOps.add("v6", "v7"); // 删除两个元素 setOps.remove("v1", "v7"); // 返回所有元素 Set set1 = setOps.members(); // 求成员数 Long size = setOps.size(); // 求交集 Set inter = setOps.intersect("set2"); // 求交集,并且用新集合inter保存 setOps.intersectAndStore("set2", "inter"); // 求差集 Set diff = setOps.diff("set2"); // 求差集,并且用新集合diff保存 setOps.diffAndStore("set2", "diff"); // 求并集 Set union = setOps.union("set2"); // 求并集,并且用新集合union保存 setOps.unionAndStore("set2", "union"); Map<String, Object> map = new HashMap<String, Object>(); map.put("success", true); return map; }(4)有序集合(zset)
有序集合和集合的差别不大,只是靠它在数据结构中增加一个属性——score(分数)得以支持。为了支持这个变化,Spring提供了TypeTuple接口,它定义了两个方法:getValue()和getScore(),在其接口设计中,value是保存有序集合的值,score则是保存分数,Redis使用分数来完成集合的排序。
@RequestMapping("/zset") @ResponseBody public Map<String, Object> testZset() { Set<TypedTuple<String>> typedTupleSet = new HashSet<>(); for (int i = 1; i <= 9; i++) { // 分数 double score = i * 0.1; // 创建一个TypedTuple对象,存入值和分数 TypedTuple<String> typedTuple = new DefaultTypedTuple<String>("value" + i, score); typedTupleSet.add(typedTuple); } // 往有序集合插入元素 stringRedisTemplate.opsForZSet().add("zset1", typedTupleSet); // 绑定zset1有序集合操作 BoundZSetOperations<String, String> zsetOps = stringRedisTemplate.boundZSetOps("zset1"); // 增加一个元素 zsetOps.add("value10", 0.26); Set<String> setRange = zsetOps.range(1, 6); // 按分数排序获取有序集合 Set<String> setScore = zsetOps.rangeByScore(0.2, 0.6); // 定义值范围 Range range = new Range(); range.gt("value3");// 大于value3 // range.gte("value3");// 大于等于value3 // range.lt("value8");// 小于value8 range.lte("value8");// 小于等于value8 // 按值排序,请注意这个排序是按字符串排序 Set<String> setLex = zsetOps.rangeByLex(range); // 删除元素 zsetOps.remove("value9", "value2"); // 求分数 Double score = zsetOps.score("value8"); // 在下标区间下,按分数排序,同时返回value和score Set<TypedTuple<String>> rangeSet = zsetOps.rangeWithScores(1, 6); // 在分数区间下,按分数排序,同时返回value和score Set<TypedTuple<String>> scoreSet = zsetOps.rangeByScoreWithScores(1, 6); // 按从大到小排序 Set<String> reverseSet = zsetOps.reverseRange(2, 8); Map<String, Object> map = new HashMap<String, Object>(); map.put("success", true); return map; }(1)使用Redis事务
exec命令的意义在于执行事务,只是它在队列命令执行前会判断被watch监控的Redis键的数据是否发生变化,如果发生变化,则取消事务,否则执行。
下面进行测试(接上面代码):
@RequestMapping("/multi") @ResponseBody public Map<String, Object> testMulti() { redisTemplate.opsForValue().set("key1", "value1"); List list = (List) redisTemplate.execute((RedisOperations operations) -> { // 设置要监控key1 operations.watch("key1"); // 开启事务,在exec命令执行前,全部都只是进入队列 operations.multi(); operations.opsForValue().set("key2", "value2"); operations.opsForValue().increment("key1", 1);// ① // 获取值将为null,因为redis只是把命令放入队列, Object value2 = operations.opsForValue().get("key2"); System.out.println("命令在队列,所以value为null【" + value2 + "】"); operations.opsForValue().set("key3", "value3"); Object value3 = operations.opsForValue().get("key3"); System.out.println("命令在队列,所以value为null【" + value3 + "】"); // 执行exec命令,将先判别key1是否在监控后被修改过,如果是不执行事务,否则执行事务 return operations.exec();// ② }); System.out.println(list); Map<String, Object> map = new HashMap<String, Object>(); map.put("success", true); return map; }只有在exec命令执行的时候,才能发现错误,对于出错的命令Redis只是报出错误,而错误后面的命令依旧会被执行。这就是Redis事务的特点。
(2)使用Redis流水线
只有需要执行SQL时,才一次性地发送所有地SQL去执行,大幅度提高执行命令的性能,这就是流水线技术。
下面是使用Redis流水线技术测试10万次读写的功能:
@RequestMapping("/pipeline") @ResponseBody public Map<String, Object> testPipeline() { Long start = System.currentTimeMillis(); List list = (List) redisTemplate.executePipelined((RedisOperations operations) -> { for (int i = 1; i <= 100000; i++) { operations.opsForValue().set("pipeline_" + i, "value_" + i); String value = (String) operations.opsForValue().get("pipeline_" + i); if (i == 100000) { System.out.println("命令只是进入队列,所以值为空【" + value + "】"); } } return null; }); Long end = System.currentTimeMillis(); System.out.println("耗时:" + (end - start) + "毫秒。"); Map<String, Object> map = new HashMap<String, Object>(); map.put("success", true); return map; }测试中,这10万次读写基本在300~600ms,大约平均值在400-500ms,速度很快。但需注意,运行如此多的命令,需要考虑内存空间的消耗,它最终会返回一个List对象,如果过多的命令执行返回的结果都保存在List中,会造成JVM内存溢出的异常,这个时候需要考虑使用迭代的方法执行Redis命令。
与事务一样,使用流水线的过程中,所有的命令也只是进入队列而没有执行,所以执行的返回值也是为空。
(3)使用Redis发布订阅
在 Redis 中,你可以设定对某一个 key 值进行消息发布及消息订阅,当一个 key 值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
(4)使用Lua脚本
执行Lua脚本在Redis中具备原子性,所以在需要保证数据一致性的高并发环境中,我们也可以使用Redis的Lua语言来保证数据的一致性且Lua脚本具备更加强大的运算能力,比使用Redis自身提供的事务要更好一些。
在Redis中有两种运行Lua脚本的方法,一种是直接发送Lua到Redis服务器去执行,另一种是先把Lua发送给Redis,Redis会对Lua脚本进行缓存,然后返回一个SHA1的32位编码回来,之后需要发送SHA1和相关参数给Redis便可执行。(因为Lua脚本很长,现实中的网络速度跟不上Redis的执行速度,如果采用32位编码和参数,需要传递的消息就会少了很多,极大的减少了网络传输的内容,进而提高系统的性能。)
为了支持Redis的Lua脚本,SPring提供了RedisScript接口,同时也有一个DefaultRedisScript实现类。先看RedisScript接口的源码:
public interface RedisScript<T>{ //获取脚本的Sha1 String getSha1(); //获取脚本返回值 Class<T> getResultTtype(); //获取脚本的字符串 String getScriptAsString(); }这里Spring会将Lua脚本发送到Redis服务器进行缓存,此时Redis服务器会返回一个32位的SHA1编码,这时候通过getSha1方法就会得到Redis返回的这个编码了;getResultType方法是获取Lua脚本返回的Java类型;getScriptAsString是返回脚本的字符串,以便观看脚本;
下面采用RedisScript接口执行一个Lua脚本,返回一个字符串:
@RequestMapping("/lua") @ResponseBody public Map<String, Object> testLua() { DefaultRedisScript<String> rs = new DefaultRedisScript<String>(); // 设置脚本 rs.setScriptText("return 'Hello Redis'"); // 定义返回类型,注意如果没有这个定义Spring不会返回结果 rs.setResultType(String.class); RedisSerializer<String> stringSerializer = redisTemplate.getStringSerializer(); // 执行Lua脚本 String str = (String) redisTemplate.execute(rs, stringSerializer, stringSerializer, null); Map<String, Object> map = new HashMap<String, Object>(); map.put("str", str); return map; }在RedisTemplate中,execute方法有两种执行脚本的方法,定义如下:
public <T> T execute(RedisScript<T> script ,List<K> keys, Object...args) public <T> T execute(RedisScript<T> script ,RedisSerializer<?> argsSerializer,RedisSerializer<?> resultSerializer, List<T> keys, Object...args)上面代码采用的是第二种方法。
下面考虑存在参数的情况,写一段Lua脚本用来判断两个字符串是否相同,代码如下:
@RequestMapping("/lua2") @ResponseBody public Map<String, Object> testLua2(String key1, String key2, String value1, String value2) { // 定义Lua脚本 String lua = " redis.call('set', KEYS[1], ARGV[1]) \n" + " redis.call('set', KEYS[2], ARGV[2]) \n" + " local str1 = redis.call('get', KEYS[1]) \n" + " local str2 = redis.call('get', KEYS[2]) \n" + " if str1 == str2 then \n" + "return 1 \n" + " end \n" + " return 0 \n"; System.out.println(lua); // 结果返回为Long DefaultRedisScript<Long> rs = new DefaultRedisScript<Long>(); rs.setScriptText(lua); rs.setResultType(Long.class); // 采用字符串序列化器 RedisSerializer<String> stringSerializer = redisTemplate.getStringSerializer(); // 定义key参数 List<String> keyList = new ArrayList<>(); keyList.add(key1); keyList.add(key2); // 传递两个参数值,其中第一个序列化器是key的序列化器,第二个序列化器是参数的序列化器 Long result = (Long) redisTemplate.execute(rs, stringSerializer, stringSerializer, keyList, value1, value2); Map<String, Object> map = new HashMap<String, Object>(); map.put("result", result); return map; }这里使用keyList保存各个键,然后通过execute方法传递,参数通过可变化的方式传递,且键和参数的序列化器都设置为了字符串序列化器。这里脚本返回一个数字(长整型Long)。
本节代码已上传Github: https://github.com/lizeyang18/SpringBoot-2.x/tree/master/chapter7
学习永不止步,继续加油~
