分布式-延时任务方案分析

引言

在开发中,往往会遇到一些关于延时任务的需求。例如:

  • 生成订单60秒后,给用户发短信。
  • 生成订单30分钟未支付,则自动取消。

对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别:

  • 定时任务有明确的触发时间,延时任务没有。
  • 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期。
  • 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务。

下面,我们以判断订单是否超时为例,进行方案分析。

方案分析

数据库轮询

思路

该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作。

实现

使用spring集成的quartz就可以很好地实现这个功能,此处就不详述了。

优缺点

优点:简单易行,支持集群操作。
缺点:(1)对服务器内存消耗大。
   (2)存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟。
   (3)假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大。

JDK的延迟队列

思路

该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。
DelayedQueue实现工作流程如下图所示。

其中:

  • Poll():获取并移除队列的超时元素,没有则返回空。这种事非阻塞的。
  • take():获取并移除队列的超时元素,如果没有则wait当前线程,直到有元素满足超时条件,返回结果。这种事阻塞式的。

实现

定义一个类OrderDelay实现Delayed,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

@Slf4j
public class OrderDelay implements Delayed {

private String orderId;

//超时的时间点(ms时间戳)
private long timeoutStamp;

public OrderDelay(String orderId, long timeoutMs) {
this.orderId = orderId;
this.timeoutStamp = timeoutMs + System.currentTimeMillis();
}

/**
* 对于给定的TimeUnit,距离你自定义的超时截止时间点还有多久
*
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(timeoutStamp - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

/**
* 比较两个对象谁更快要到延迟的时间截止点。
* <p>
* 因为Delayed对象是可以放到DelayQueue队列里面的,但放到的时候不是依次放入队列,而是通过compareTo来判断放入的位置,
* 这样从队列取元素的时候,总是拉取到最快要达到时间截止点的数据。
*
* @param other
* @return
*/
@Override
public int compareTo(Delayed other) {
if (other == this) {
return 0;
}
if (!(other instanceof OrderDelay)) {
throw new IllegalArgumentException();
}
OrderDelay t = (OrderDelay) other;
long d = (getDelay(TimeUnit.MILLISECONDS) - t.getDelay(TimeUnit.MILLISECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}


public void dealTimeOutOrder() {
log.info("处理超时的订单|订单号={}", orderId);
}

}

对超时时间设置为3s进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Test
public void orderDelayTest() throws InterruptedException {
//声明一个延时队列
DelayQueue<OrderDelay> queue = new DelayQueue<>();

//启动一个消费者从延时队列里不断地拉取元素
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
OrderDelay orderDelay = queue.take();
orderDelay.dealTimeOutOrder();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "延时队列消费线程").start();

//将订单延时任务放入队列
for (int i = 1; i <= 5; i++) {
queue.put(new OrderDelay(String.valueOf(i), TimeUnit.MILLISECONDS.convert(3, TimeUnit.SECONDS)));
log.info("将订单延时任务放入队列|orderId={}", i);

//随机休眠几秒
TimeUnit.SECONDS.sleep(new Random().nextInt(4));
}

//主线程睡眠,等待消费线程拉取任务完毕
TimeUnit.SECONDS.sleep(20L);
}

运行测试方法打印结果如下:

1
2
3
4
5
6
7
8
9
10
14:05:33.422 [main] INFO com.lzumetal.multithread.queue.DelayQueueTest - 将订单延时任务放入队列|orderId=1
14:05:36.422 [延时队列消费线程] INFO com.lzumetal.multithread.queue.OrderDelay - 处理超时的订单|订单号=1
14:05:36.427 [main] INFO com.lzumetal.multithread.queue.DelayQueueTest - 将订单延时任务放入队列|orderId=2
14:05:38.427 [main] INFO com.lzumetal.multithread.queue.DelayQueueTest - 将订单延时任务放入队列|orderId=3
14:05:39.427 [延时队列消费线程] INFO com.lzumetal.multithread.queue.OrderDelay - 处理超时的订单|订单号=2
14:05:41.428 [main] INFO com.lzumetal.multithread.queue.DelayQueueTest - 将订单延时任务放入队列|orderId=4
14:05:41.428 [延时队列消费线程] INFO com.lzumetal.multithread.queue.OrderDelay - 处理超时的订单|订单号=3
14:05:43.429 [main] INFO com.lzumetal.multithread.queue.DelayQueueTest - 将订单延时任务放入队列|orderId=5
14:05:44.429 [延时队列消费线程] INFO com.lzumetal.multithread.queue.OrderDelay - 处理超时的订单|订单号=4
14:05:46.429 [延时队列消费线程] INFO com.lzumetal.multithread.queue.OrderDelay - 处理超时的订单|订单号=5

可以看到订单3秒超时后就被处理。

优缺点

优点:效率高,任务触发时间延迟低。
缺点:(1)服务器重启后,数据全部消失,怕宕机。
   (2)集群扩展比较麻烦。
   (3)因为内存条件限制的原因,比如下单未付款的订单数太多,那么容易容易就出现OOM异常。
   (4)代码复杂度较高。

时间轮算法

思路

先上一张时间轮的图

时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。
如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)。

实现

我们用Netty的HashedWheelTimer来实现,给Pom加上下面的依赖:

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.24.Final</version>
</dependency>

测试代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class WheelTimerTest {

static class MyTimerTask implements TimerTask {

boolean flag;

public MyTimerTask(boolean flag) {
this.flag = flag;
}

public void run(Timeout timeout) throws Exception {
System.out.println("处理超时的订单");
this.flag = false;
}
}

public static void main(String[] argv) {
MyTimerTask timerTask = new MyTimerTask(true);
Timer timer = new HashedWheelTimer();
timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
int i = 1;
while (timerTask.flag) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i + "秒过去了");
i++;
}
}

}

打印结果:

1
2
3
4
5
6
7
1秒过去了
2秒过去了
3秒过去了
4秒过去了
5秒过去了
处理超时的订单
6秒过去了

优缺点

优点:效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。
缺点:(1)服务器重启后,数据全部消失,怕宕机。
   (2)集群扩展比较麻烦。
   (3)因为内存条件限制的原因,比如下单未付款的订单数太多,容易就出现OOM异常。

Redis有序集合(zset)

利用redis的zset,zset是一个有序集合,每一个元素(member)都关联了一个score,通过score排序来取集合中的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
redis模拟
127.0.0.1:6379> zadd cron 10001 task1
(integer) 1
127.0.0.1:6379> zadd cron 9001 task2
(integer) 1
127.0.0.1:6379> zadd cron 29001 task3
(integer) 1
127.0.0.1:6379> ZRANGE cron 0 -1 withscores
1) "task2"
2) "9001"
3) "task1"
4) "10001"
5) "task3"
6) "29001"

假设当前的时间戳是 15000

127.0.0.1:6379> ZRANGEBYSCORE cron -inf 15000
1) "task2"
2) "task1"
127.0.0.1:6379> ZREM cron task2
(integer) 1
127.0.0.1:6379> ZREM cron task1
(integer) 1
127.0.0.1:6379> ZRANGE cron 0 -1 withscores
1) "task3"
2) "29001"

思路

使用sorted Sets的自动排序, key 为任务id,score 为任务计划执行的时间戳,这样任务在加入sets的时候已经按时间排序,另启一个定时任务每隔1s(或者其他间隔)去取出sets顶部的数据,小于当前时间的可以通过pop取出来然后去执行。

代码实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RedisMain.class)
@Slf4j
public class DelayTaskTest {


@Autowired
private StringRedisTemplate redisTemplate;


@Test
public void orderDelayTaskTest() throws InterruptedException {
final String key = "test_dalay_order_id";
//启动一个消费线程,从redis拉取订单
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
long currentMillis = System.currentTimeMillis();
BoundZSetOperations<String, String> ops = redisTemplate.boundZSetOps(key);
Set<String> list = ops.rangeByScore(0, currentMillis);
if (!CollectionUtils.isEmpty(list)) {
for (String id : list) {
//先移除,移除成功再处理
Long remove = ops.remove(id);
//remove大于0表示移除成功
if (remove != null && remove > 0) {
log.info("处理超时的订单|订单id={}|maxScore={}", id, currentMillis);
}
}
} else {
log.info("当前没有需要处理的超时订单|maxScore={}", currentMillis);
}
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

//生成5个订单保存到redis
BoundZSetOperations<String, String> ops = redisTemplate.boundZSetOps(key);
for (int i = 1; i <= 5; i++) {
long score = System.currentTimeMillis() + 3 * 1000;
ops.add(String.valueOf(i), score);
log.info("订单保存至redis的zset中|id={}|score={}", i, score);
TimeUnit.SECONDS.sleep(new Random().nextInt(3));
}

//等待消费线程消费完毕
TimeUnit.SECONDS.sleep(12L);
}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2021-11-11 19:05:43.683  INFO 15848 --- [       Thread-3] c.l.springboot.redis.test.DelayTaskTest  : 当前没有需要处理的超时订单|maxScore=1636628740998
2021-11-11 19:05:43.686 INFO 15848 --- [ main] c.l.springboot.redis.test.DelayTaskTest : 订单保存至redis的zset中|id=1|score=1636628744002
2021-11-11 19:05:43.690 INFO 15848 --- [ main] c.l.springboot.redis.test.DelayTaskTest : 订单保存至redis的zset中|id=2|score=1636628746686
2021-11-11 19:05:43.694 INFO 15848 --- [ main] c.l.springboot.redis.test.DelayTaskTest : 订单保存至redis的zset中|id=3|score=1636628746690
2021-11-11 19:05:43.698 INFO 15848 --- [ main] c.l.springboot.redis.test.DelayTaskTest : 订单保存至redis的zset中|id=4|score=1636628746695
2021-11-11 19:05:43.701 INFO 15848 --- [ main] c.l.springboot.redis.test.DelayTaskTest : 订单保存至redis的zset中|id=5|score=1636628746698
2021-11-11 19:05:44.699 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 处理超时的订单|订单id=1|maxScore=1636628744684
2021-11-11 19:05:45.708 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 当前没有需要处理的超时订单|maxScore=1636628745700
2021-11-11 19:05:46.726 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 处理超时的订单|订单id=2|maxScore=1636628746709
2021-11-11 19:05:46.734 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 处理超时的订单|订单id=3|maxScore=1636628746709
2021-11-11 19:05:46.745 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 处理超时的订单|订单id=4|maxScore=1636628746709
2021-11-11 19:05:46.750 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 处理超时的订单|订单id=5|maxScore=1636628746709
2021-11-11 19:05:47.760 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 当前没有需要处理的超时订单|maxScore=1636628747751
2021-11-11 19:05:48.777 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 当前没有需要处理的超时订单|maxScore=1636628748761
2021-11-11 19:05:49.783 INFO 15848 --- [ Thread-3] c.l.springboot.redis.test.DelayTaskTest : 当前没有需要处理的超时订单|maxScore=1636628749778

优缺点

优点:实现比较简单,且因为有redis做数据持久化重启服务并不会导致数据丢失。
缺点:(1)需要额外引入redis服务并保证redis高可用性。
(2)多节点的服务最好使用分布式锁。当然,我们在代码中已经有了“先移除,移除成功再处理”的逻辑,所以分布式锁并不是必须的。

Redis的key过期通知

思路

从Reids 2.8 有一个键空间通知的机制Keyspace Notifications,强烈建议了解一下), 允许客户端去订阅一些key的事件,其中就有 key过期的事件,我们可以把key名称设置为task的id等标识(这种方式value的值无法取到,所以只用key来识别任务),expire设置为计划要执行的时间,然后开启一个客户端来订阅消息过期事件,然后处理task。

演示

首先要确认下Redis版本是2.8以上,然后在redis.conf中,加入一条配置:

1
notify-keyspace-events Ex

重启redis,开启一个窗口,订阅key过期时间:

1
2
3
4
5
root@server01 # ./redis-cli --csv psubscribe '__keyevent@0__:expired'
Reading messages... (press Ctrl-C to quit)
"psubscribe","__keyevent@0__:expired",1
"pmessage","__keyevent@0__:expired","__keyevent@0__:expired","task1"
"pmessage","__keyevent@0__:expired","__keyevent@0__:expired","task2"

第二个窗口,设置key:

1
2
3
4
5
6
7
8
127.0.0.1:6379> set task1 xx
OK
127.0.0.1:6379> EXPIRE task1 5
(integer) 1
127.0.0.1:6379> set task2 xx
OK
127.0.0.1:6379> EXPIREAT task2 1469525560
(integer) 1

当key过期的时候就看到第一个窗口的通知了,订阅的key keyevent@:expired 这个格式是固定的,db代表的是数据库的编号,由于订阅开启之后这个库的所有key过期时间都会被推送过来,所以最好单独使用一个数据库来进行隔离。

java代码要怎么写呢?以SpringBoot项目来说,首先要添加依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

增加配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;


@Configuration
public class RedisListenerConfig {

@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}

}

创建监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}

public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString(); // 获取过期的key
if (expiredKey.contains("TIME_OUT_KEY")) { // 判断是否是想要监听的过期key
log.info("redis key过期:{}", expiredKey);
// TODO 业务逻辑
}
}

}

优缺点

优点:实现比较简单,且支持分布式服务。
缺点:(1)对redis有侵入,必须要修改redis配置。
(2)如果服务与redis断连了一段时间,断连期间的失效事件都丢失了。

------ 本文完 ------