ikki@github.io:~$

基于redis 的延迟队列

基于Redis 的延迟队列

延迟队列

  • 延迟队列是一个业务需求。没有谁愿意在系统里阻塞一堆数据不去消费。

使用redis Zset 和 List 的相关特性实现

  • Redis ZSet 是一个有序集合, 每一个 Member 都可以添加一个Score, 数据会按照Score 进行排序
  • 将要消费的数据的时间戳作为Score, 添加到ZSet中
  • Redis List 是一个有序列表, 可以从头部(左边) 插入, 尾部(右边) 弹出
  • 将要消费的数据从 ZSet 中捞出, 添加到List 中, 消费者线程Pop List 中的数据并进行消费

搬迁: 从ZSet 到 List 中


// 实现一个分布式锁, 使用spring-boot-data-redis-luttuce 实现

boolean isLock = false;
String lockValue = UUID.randomUUID().toString();

try {
    //使用 setIfAbsent, 底层使用 SETNX 命令实现, 锁60s 后自动释放
    isLock = redisTemplate.opsForValue().setIfAbsent(lock, lockValue, 60, TimeUnit.SECONDS);
    // 上锁之后, 开始搬迁 业务key
    if(isLock) {
        long currentTimeMillis = System.currentTimeMillis() - 1_000L;
        Set<String> keys = redisTemplate.opsForZSet().rangeByScore(queue, 0, currentTimeMillis);
        if(null == keys || keys.isEmpty()) {
           // 如果没有key, 结束本次任务
           return;
        }

        long size = redisTemplate.opsForList().leftPushAll("List_Consume_Queue", keys);
        if(size > 0) {
           redisTemplate.opsForZSet().removeRangeByScore(queue, 0, currentTimeMillis);
        }
    }
} finally {
    String existValue = redisTemplate.opsForValue().get(lock);
    // 当获取到的锁是当前线程 加上的,  则进行释放
    if(isLock && lockValue.equals(existValue)) {
        boolean isUnlock = redisTemplate.delete(lock);
        if(isUnlock) {
           // log.debug("unlock: {} success", lock);
        } else {
           // log.error("release lock: {} failed", lock);
        }

    }

}


消费

// 这里可以看做是一个 死信队列


// pop 的数据不会被其他线程获取到
String key = redisTemplate.opsForList().rightPop("List_Consume_Queue");

if ( null == key) {
    return;
}

//..消费业务逻辑

  • 也可以使用 lua 脚本支持原子性操作的特性,直接从zset 里进行消费