引言
在开发中,往往会遇到一些关于延时任务的需求。例如:
- 生成订单60秒后,给用户发短信。
- 生成订单30分钟未支付,则自动取消。
对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别:
- 定时任务有明确的触发时间,延时任务没有。
- 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期。
- 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务。
下面,我们以判断订单是否超时为例,进行方案分析。
方案分析
数据库轮询
思路
该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作。
实现
使用spring集成的quartz就可以很好地实现这个功能,此处就不详述了。
优缺点
优点:简单易行,支持集群操作。
缺点:(1)对服务器内存消耗大。
(2)存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟。
(3)假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大。
JDK的延迟队列
思路
该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。
DelayedQueue实现工作流程如下图所示。
其中:
Poll():获取并移除队列的超时元素,没有则返回空。这种事非阻塞的。take():获取并移除队列的超时元素,如果没有则wait当前线程,直到有元素满足超时条件,返回结果。这种事阻塞式的。
实现
定义一个类OrderDelay实现Delayed,代码如下:
1 |
|
对超时时间设置为3s进行测试:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void orderDelayTest() throws InterruptedException {
//声明一个延时队列
DelayQueue<OrderDelay> queue = new DelayQueue<>();
//启动一个消费者从延时队列里不断地拉取元素
new Thread(new Runnable() {
public void run() {
while (true) {
try {
OrderDelay orderDelay = queue.take();
orderDelay.dealTimeOutOrder();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "延时队列消费线程").start();
//将订单延时任务放入队列
for (int i = 1; i <= 5; i++) {
queue.put(new OrderDelay(String.valueOf(i), TimeUnit.MILLISECONDS.convert(3, TimeUnit.SECONDS)));
log.info("将订单延时任务放入队列|orderId={}", i);
//随机休眠几秒
TimeUnit.SECONDS.sleep(new Random().nextInt(4));
}
//主线程睡眠,等待消费线程拉取任务完毕
TimeUnit.SECONDS.sleep(20L);
}
运行测试方法打印结果如下:1
2
3
4
5
6
7
8
9
1014:05:33.422 [main] INFO com.lzumetal.multithread.queue.DelayQueueTest - 将订单延时任务放入队列|orderId=1
14:05:36.422 [延时队列消费线程] INFO com.lzumetal.multithread.queue.OrderDelay - 处理超时的订单|订单号=1
14:05:36.427 [main] INFO com.lzumetal.multithread.queue.DelayQueueTest - 将订单延时任务放入队列|orderId=2
14:05:38.427 [main] INFO com.lzumetal.multithread.queue.DelayQueueTest - 将订单延时任务放入队列|orderId=3
14:05:39.427 [延时队列消费线程] INFO com.lzumetal.multithread.queue.OrderDelay - 处理超时的订单|订单号=2
14:05:41.428 [main] INFO com.lzumetal.multithread.queue.DelayQueueTest - 将订单延时任务放入队列|orderId=4
14:05:41.428 [延时队列消费线程] INFO com.lzumetal.multithread.queue.OrderDelay - 处理超时的订单|订单号=3
14:05:43.429 [main] INFO com.lzumetal.multithread.queue.DelayQueueTest - 将订单延时任务放入队列|orderId=5
14:05:44.429 [延时队列消费线程] INFO com.lzumetal.multithread.queue.OrderDelay - 处理超时的订单|订单号=4
14:05:46.429 [延时队列消费线程] INFO com.lzumetal.multithread.queue.OrderDelay - 处理超时的订单|订单号=5
可以看到订单3秒超时后就被处理。
优缺点
优点:效率高,任务触发时间延迟低。
缺点:(1)服务器重启后,数据全部消失,怕宕机。
(2)集群扩展比较麻烦。
(3)因为内存条件限制的原因,比如下单未付款的订单数太多,那么容易容易就出现OOM异常。
(4)代码复杂度较高。
时间轮算法
思路
先上一张时间轮的图
时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。
如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)。
实现
我们用Netty的HashedWheelTimer来实现,给Pom加上下面的依赖:1
2
3
4
5<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.24.Final</version>
</dependency>
测试代码示例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class WheelTimerTest {
static class MyTimerTask implements TimerTask {
boolean flag;
public MyTimerTask(boolean flag) {
this.flag = flag;
}
public void run(Timeout timeout) throws Exception {
System.out.println("处理超时的订单");
this.flag = false;
}
}
public static void main(String[] argv) {
MyTimerTask timerTask = new MyTimerTask(true);
Timer timer = new HashedWheelTimer();
timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
int i = 1;
while (timerTask.flag) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i + "秒过去了");
i++;
}
}
}
打印结果:1
2
3
4
5
6
71秒过去了
2秒过去了
3秒过去了
4秒过去了
5秒过去了
处理超时的订单
6秒过去了
优缺点
优点:效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。
缺点:(1)服务器重启后,数据全部消失,怕宕机。
(2)集群扩展比较麻烦。
(3)因为内存条件限制的原因,比如下单未付款的订单数太多,容易就出现OOM异常。
Redis有序集合(zset)
利用redis的zset,zset是一个有序集合,每一个元素(member)都关联了一个score,通过score排序来取集合中的值。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27redis模拟
127.0.0.1:6379> zadd cron 10001 task1
(integer) 1
127.0.0.1:6379> zadd cron 9001 task2
(integer) 1
127.0.0.1:6379> zadd cron 29001 task3
(integer) 1
127.0.0.1:6379> ZRANGE cron 0 -1 withscores
1) "task2"
2) "9001"
3) "task1"
4) "10001"
5) "task3"
6) "29001"
假设当前的时间戳是 15000
127.0.0.1:6379> ZRANGEBYSCORE cron -inf 15000
1) "task2"
2) "task1"
127.0.0.1:6379> ZREM cron task2
(integer) 1
127.0.0.1:6379> ZREM cron task1
(integer) 1
127.0.0.1:6379> ZRANGE cron 0 -1 withscores
1) "task3"
2) "29001"
思路
使用sorted Sets的自动排序, key 为任务id,score 为任务计划执行的时间戳,这样任务在加入sets的时候已经按时间排序,另启一个定时任务每隔1s(或者其他间隔)去取出sets顶部的数据,小于当前时间的可以通过pop取出来然后去执行。
代码实现示例
1 | (SpringRunner.class) |
输出结果:1
2
3
4
5
6
7
8
9
10
11
12
13
14
152021-11-11 19:05:43.683 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 当前没有需要处理的超时订单|maxScore=1636628740998
2021-11-11 19:05:43.686 INFO 15848 --- [ main] c.l.springboot.redis.test.DelayTaskTest : 订单保存至redis的zset中|id=1|score=1636628744002
2021-11-11 19:05:43.690 INFO 15848 --- [ main] c.l.springboot.redis.test.DelayTaskTest : 订单保存至redis的zset中|id=2|score=1636628746686
2021-11-11 19:05:43.694 INFO 15848 --- [ main] c.l.springboot.redis.test.DelayTaskTest : 订单保存至redis的zset中|id=3|score=1636628746690
2021-11-11 19:05:43.698 INFO 15848 --- [ main] c.l.springboot.redis.test.DelayTaskTest : 订单保存至redis的zset中|id=4|score=1636628746695
2021-11-11 19:05:43.701 INFO 15848 --- [ main] c.l.springboot.redis.test.DelayTaskTest : 订单保存至redis的zset中|id=5|score=1636628746698
2021-11-11 19:05:44.699 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 处理超时的订单|订单id=1|maxScore=1636628744684
2021-11-11 19:05:45.708 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 当前没有需要处理的超时订单|maxScore=1636628745700
2021-11-11 19:05:46.726 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 处理超时的订单|订单id=2|maxScore=1636628746709
2021-11-11 19:05:46.734 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 处理超时的订单|订单id=3|maxScore=1636628746709
2021-11-11 19:05:46.745 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 处理超时的订单|订单id=4|maxScore=1636628746709
2021-11-11 19:05:46.750 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 处理超时的订单|订单id=5|maxScore=1636628746709
2021-11-11 19:05:47.760 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 当前没有需要处理的超时订单|maxScore=1636628747751
2021-11-11 19:05:48.777 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 当前没有需要处理的超时订单|maxScore=1636628748761
2021-11-11 19:05:49.783 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 当前没有需要处理的超时订单|maxScore=1636628749778
优缺点
优点:实现比较简单,且因为有redis做数据持久化重启服务并不会导致数据丢失。
缺点:(1)需要额外引入redis服务并保证redis高可用性。
(2)多节点的服务最好使用分布式锁。当然,我们在代码中已经有了“先移除,移除成功再处理”的逻辑,所以分布式锁并不是必须的。
Redis的key过期通知
思路
从Reids 2.8 有一个键空间通知的机制(Keyspace Notifications,强烈建议了解一下), 允许客户端去订阅一些key的事件,其中就有 key过期的事件,我们可以把key名称设置为task的id等标识(这种方式value的值无法取到,所以只用key来识别任务),expire设置为计划要执行的时间,然后开启一个客户端来订阅消息过期事件,然后处理task。
演示
首先要确认下Redis版本是2.8以上,然后在redis.conf中,加入一条配置:1
notify-keyspace-events Ex
重启redis,开启一个窗口,订阅key过期时间:1
2
3
4
5root@server01 # ./redis-cli --csv psubscribe '__keyevent@0__:expired'
Reading messages... (press Ctrl-C to quit)
"psubscribe","__keyevent@0__:expired",1
"pmessage","__keyevent@0__:expired","__keyevent@0__:expired","task1"
"pmessage","__keyevent@0__:expired","__keyevent@0__:expired","task2"
第二个窗口,设置key:1
2
3
4
5
6
7
8127.0.0.1:6379> set task1 xx
OK
127.0.0.1:6379> EXPIRE task1 5
(integer) 1
127.0.0.1:6379> set task2 xx
OK
127.0.0.1:6379> EXPIREAT task2 1469525560
(integer) 1
当key过期的时候就看到第一个窗口的通知了,订阅的key keyevent@
java代码要怎么写呢?以SpringBoot项目来说,首先要添加依赖:1
2
3
4<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
增加配置类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
public class RedisListenerConfig {
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
创建监听:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString(); // 获取过期的key
if (expiredKey.contains("TIME_OUT_KEY")) { // 判断是否是想要监听的过期key
log.info("redis key过期:{}", expiredKey);
// TODO 业务逻辑
}
}
}
优缺点
优点:实现比较简单,且支持分布式服务。
缺点:(1)对redis有侵入,必须要修改redis配置。
(2)如果服务与redis断连了一段时间,断连期间的失效事件都丢失了。