基于redis 的延迟队列
基于Redis 的延迟队列
延迟队列
- 延迟队列是一个业务需求。没有谁愿意在系统里阻塞一堆数据不去消费。
使用redis Zset 和 List 的相关特性实现
- Redis ZSet 是一个有序集合, 每一个 Member 都可以添加一个Score, 数据会按照Score 进行排序
- 将要消费的数据的时间戳作为Score, 添加到ZSet中
- Redis List 是一个有序列表, 可以从头部(左边) 插入, 尾部(右边) 弹出
- 将要消费的数据从 ZSet 中捞出, 添加到List 中, 消费者线程Pop List 中的数据并进行消费
搬迁: 从ZSet 到 List 中
// 实现一个分布式锁, 使用spring-boot-data-redis-luttuce 实现
boolean isLock = false;
String lockValue = UUID.randomUUID().toString();
try {
//使用 setIfAbsent, 底层使用 SETNX 命令实现, 锁60s 后自动释放
isLock = redisTemplate.opsForValue().setIfAbsent(lock, lockValue, 60, TimeUnit.SECONDS);
// 上锁之后, 开始搬迁 业务key
if(isLock) {
long currentTimeMillis = System.currentTimeMillis() - 1_000L;
Set<String> keys = redisTemplate.opsForZSet().rangeByScore(queue, 0, currentTimeMillis);
if(null == keys || keys.isEmpty()) {
// 如果没有key, 结束本次任务
return;
}
long size = redisTemplate.opsForList().leftPushAll("List_Consume_Queue", keys);
if(size > 0) {
redisTemplate.opsForZSet().removeRangeByScore(queue, 0, currentTimeMillis);
}
}
} finally {
String existValue = redisTemplate.opsForValue().get(lock);
// 当获取到的锁是当前线程 加上的, 则进行释放
if(isLock && lockValue.equals(existValue)) {
boolean isUnlock = redisTemplate.delete(lock);
if(isUnlock) {
// log.debug("unlock: {} success", lock);
} else {
// log.error("release lock: {} failed", lock);
}
}
}
消费
// 这里可以看做是一个 死信队列
// pop 的数据不会被其他线程获取到
String key = redisTemplate.opsForList().rightPop("List_Consume_Queue");
if ( null == key) {
return;
}
//..消费业务逻辑
- 也可以使用 lua 脚本支持原子性操作的特性,直接从zset 里进行消费