Redisson中定义了分布式延迟队列RDelayedQueue,这是一种基于我们前面介绍过的zset结构实现的延时队列,它允许以指定的延迟时长将元素放到目标队列中。
其实就是在zset的基础上增加了一个基于内存的延迟队列。当我们要添加一个数据到延迟队列的时候,redisson会把数据+超时时间放到zset中,并且起一个延时任务,当任务到期的时候,再去zset中把数据取出来,返回给客户端使用。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>最新版</version>
</dependency>
定义一个Redisson客户端:
/**
* @author Hollis
*/
@Configuration
public class RedissonConfig {
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
接下来,在想要使用延迟队列的地方做如下方式:
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
@Component
public class RedissonOrderDelayQueue {
@Autowired
RedissonClient redisson;
public void addTaskToDelayQueue(String orderId) {
RBlockingDeque<String> blockingDeque = redisson.getBlockingDeque("orderQueue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingDeque);
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "添加任务到延时队列里面");
delayedQueue.offer(orderId, 3, TimeUnit.SECONDS);
delayedQueue.offer(orderId, 6, TimeUnit.SECONDS);
delayedQueue.offer(orderId, 9, TimeUnit.SECONDS);
}
public String getOrderFromDelayQueue() {
RBlockingDeque<String> blockingDeque = redisson.getBlockingDeque("orderQueue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingDeque);
String orderId = blockingDeque.take();
return orderId;
}
}
使用offer方法将两条延迟消息添加到RDelayedQueue中,使用take方法从RQueue中获取消息,如果没有消息可用,该方法会阻塞等待,直到消息到达。
我们使用 RDelayedQueue 的 offer 方法将元素添加到延迟队列,并指定延迟的时间。当元素的延迟时间到达时,Redisson 会将元素从 RDelayedQueue 转移到关联的 RBlockingDeque 中。
使用 RBlockingDeque 的 take 方法从关联的 RBlockingDeque 中获取元素。这是一个阻塞操作,如果没有元素可用,它会等待直到有元素可用。
所以,为了从延迟队列中取出元素,使用 RBlockingDeque 的 take 方法,因为 Redisson 的 RDelayedQueue 实际上是通过转移元素到关联的 RBlockingDeque 来实现延迟队列的。