Categories
程式開發

延時任務的幾種實現方式


一、應用場景

在需求開發過程中,我們經常會遇到一些類似下面的場景:

a. 外賣訂單超過15分鐘未支付,自動取消b. 使用搶票軟件訂到車票後,1小時內未支付,自動取消c. 待處理申請超時1天,通知審核人員經理,超時2天通知審核人員總監d. 客戶預定自如房子後,24小時內未支付,房源自動釋放

延時任務的幾種實現方式 1

那麼針對這類場景的需求應該如果實現呢,我們最先想到的一般是啟個定時任務,來掃描數據庫裡符合條件的數據,並對其進行更新操作。一般來說spring-quartz 、elasticjob 就可以實現,甚至自己寫個 Timer 也可以。但是這種方式有個弊端,就是需要不停的掃描數據庫,如果數據量比較大,並且任務執行間隔時間比較短,對數據庫會有一定的壓力。另外定時任務的執行間隔時間的粒度也不太好設置,設置長會影響時效性,設置太短又會增加服務壓力。我們來看一下有沒有更好的實現方式。

二、JDK 延時隊列實現

​DelayQueue 是 JDK 中 java.util.concurrent 包下的一種無界阻塞隊列,底層是優先隊列 PriorityQueue。對於放到隊列中的任務,可以按照到期時間進行排序,只需要取已經到期的元素處理即可。

具體的步驟是,要放入隊列的元素需要實現 Delayed 接口並實現 getDelay 方法來計算到期時間,compare 方法來對比到期時間以進行排序。一個簡單的使用例子如下:

package com.lyqiang.delay.jdk;

import java.time.LocalDateTime;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
* @author lyqiang
*/
public class TestDelayQueue {

public static void main(String[] args) throws InterruptedException {

// 新建3个任务,并依次设置超时时间为 20s 10s 30s
DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 20000L);
DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 10000L);
DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 30000L);

DelayQueue queue = new DelayQueue();
queue.add(d1);
queue.add(d2);
queue.add(d3);
int size = queue.size();

System.out.println("当前时间是:" + LocalDateTime.now());

// 从延时队列中获取元素, 将输出 d2 、d1 、d3
for (int i = 0; i < size; i++) { System.out.println(queue.take() + " ------ " + LocalDateTime.now()); } } } class DelayTask implements Delayed { private Integer taskId; private long exeTime; DelayTask(Integer taskId, long exeTime) { this.taskId = taskId; this.exeTime = exeTime; } @Override public long getDelay(TimeUnit unit) { return exeTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { DelayTask t = (DelayTask) o; if (this.exeTime - t.exeTime <= 0) { return -1; } else { return 1; } } @Override public String toString() { return "DelayTask{" + "taskId=" + taskId + ", exeTime=" + exeTime + '}'; } }

代碼的執行結果如下:

延時任務的幾種實現方式 2

使用DelayQueue, 只需要有一個線程不斷從隊列中獲取數據即可,它的優點是不用引入第三方依賴,實現也很簡單,缺點也很明顯,它是內存存儲,對分佈式支持不友好,如果發生單點故障,可能會造成數據丟失,無界隊列還存在OOM 的風險。

三、時間輪算法實現

1996 年 George Varghese 和 Tony Lauck 的論文《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility》中提出了一種時間輪管理 Timeout 事件的方式。其設計非常巧妙,並且類似時鐘的運行,如下圖的原始時間輪有8 個格子,假定指針經過每個格子花費時間是1 個時間單位,當前指針指向0,一個17 個時間單位後超時的任務則需要運轉2 圈再通過一個格子後被執行,放在相同格子的任務會形成一個鍊錶。

延時任務的幾種實現方式 3

Netty 包裡提供了一種時間輪的實現——HashedWheelTimer,其底層使用了數組+鍊錶的數據結構,使用方式如下

package com.lyqiang.delay.wheeltimer;

import io.netty.util.HashedWheelTimer;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

/**
* @author lyqiang
*/
public class WheelTimerTest {

public static void main(String[] args) {

//设置每个格子是 100ms, 总共 256 个格子
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256);

//加入三个任务,依次设置超时时间是 10s 5s 20s

System.out.println("加入一个任务,ID = 1, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println("执行一个任务,ID = 1, time= " + LocalDateTime.now());
}, 10, TimeUnit.SECONDS);

System.out.println("加入一个任务,ID = 2, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println("执行一个任务,ID = 2, time= " + LocalDateTime.now());
}, 5, TimeUnit.SECONDS);

System.out.println("加入一个任务,ID = 3, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println("执行一个任务,ID = 3, time= " + LocalDateTime.now());
}, 20, TimeUnit.SECONDS);

System.out.println("等待任务执行===========");
}
}

代碼執行結果如下:

延時任務的幾種實現方式 4

相比 DelayQueue 的數據結構,時間輪在算法複雜度上有一定優勢,但用時間輪來實現延時任務同樣避免不了單點故障。

四、Redis ZSet 實現

Redis 裡有5 種數據結構,最常用的是String 和Hash,而ZSet 是一種支持按score 排序的數據結構,每個元素都會關聯一個double 類型的分數,Redis 通過分數來為集合中的成員進行從小到大的排序,借助這個特性我們可以把超時時間作為score 來將任務進行排序。

​使用 zadd key score member 命令向 redis 中放入任務,超時時間作為 score, 任務 ID 作為 member, 使用 zrange key start stop withscores 命令從 redis 中讀取任務,使用 zrem key member 命令從 redis 中刪除任務。代碼如下:

package com.lyqiang.delay.redis;

import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @author lyqiang
*/
public class TestRedisDelay {

public static void main(String[] args) {

TaskProducer taskProducer = new TaskProducer();
//创建 3个任务,并设置超时间为 10s 5s 20s
taskProducer.produce(1, System.currentTimeMillis() + 10000);
taskProducer.produce(2, System.currentTimeMillis() + 5000);
taskProducer.produce(3, System.currentTimeMillis() + 20000);

System.out.println("等待任务执行===========");

//消费端从redis中消费任务
TaskConsumer taskConsumer = new TaskConsumer();
taskConsumer.consumer();
}
}

class TaskProducer {

public void produce(Integer taskId, long exeTime) {
System.out.println("加入任务, taskId: " + taskId + ", exeTime: " + exeTime + ", 当前时间:" + LocalDateTime.now());
RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));
}
}

class TaskConsumer {

public void consumer() {

Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
while (true) {
Set taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);
if (taskIdSet == null || taskIdSet.isEmpty()) {
//System.out.println("没有任务");
} else {
taskIdSet.forEach(id -> {
long result = RedisOps.getJedis().zrem(RedisOps.key, id);
if (result == 1L) {
System.out.println("从延时队列中获取到任务,taskId:" + id + " , 当前时间:" + LocalDateTime.now());
}
});
}
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
}

執行結果如下:

延時任務的幾種實現方式 4

相比前兩種實現方式,使用 Redis 可以將數據持久化到磁盤,規避了數據丟失的風險,並且支持分佈式,避免了單點故障。

五、MQ 延時隊列實現

以 RabbitMQ 為例,它本身並沒有直接支持延時隊列的功能,但是通過一些特性,我們可以達到實現延時隊列的效果。

​RabbitMQ 可以為 Queue 設置 TTL,,到了過期時間沒有被消費的消息將變為死信——Dead Letter。我們還可以為Queue 設置死信轉發 x-dead-letter-exchange,過期的消息可以被路由到另一個 Exchange。下圖說明了這個流程,生產者通過不同的RoutingKey 發送不同過期時間的消息,多個隊列分別消費並產生死信後被路由到exe-dead-exchange,再有一些隊列綁定到這個exchange,從而進行不同業務邏輯的消費。

延時任務的幾種實現方式 6

​在 RabbitMQ 界面操作如下:

1、在 g_normal_exchange 發送測試消息

延時任務的幾種實現方式 7

​2. 隊列 g_queue_10s 綁定到 g_normal_exchange,並設置 x-message-ttl 為 10s 過期,x-dead-letter-exchange 為 g_exe_dead_exchange,可以看到消息到達後,過了 10s 之後消息被路由到gexedead_exchange

延時任務的幾種實現方式 8

​ 3. 綁定到 g_exe_dead_exchange 的隊列 g_exe_10s_queue 消費到了這條消息

延時任務的幾種實現方式 9

​使用 MQ 實現的方式,支持分佈式,並且消息支持持久化,在業內應用比較多,它的缺點是每種間隔時間的場景需要分別建立隊列。

六、總結

通過上面不同實現方式的比較,可以很明顯的看出各個方案的優缺點,在分佈式系統中我們會優先考慮使用 Redis 和 MQ 的實現方式。

​在需求開發中實現一個功能的方式多種多樣,需要我們進行多維度的比較,才能選擇出合理的、可靠的、高效的並且適合自己業務的解決方案。