Categories
程式開發

快速掌握並發編程—線程池的原理和實戰


快速掌握並發編程---線程池的原理和實戰 1

上圖是裝水的池子——水池。

流行池化技術,那麼到底什麼是池化技術呢?

池化技術簡單點來說,就是提前保存大量的資源,以備不時之需。在機器資源有限的情況下,使用池化技術可以大大的提高資源的利用率,提升性能等。

在編程領域,比較典型的池化技術有:線程池、連接池、內存池、對像池等。

案例

我們系統裡經常會涉及到一些異步處理,比如說給用戶發個站內信、某項業務搞完了給用戶發個短信、發個推送等這種異步業務處理。 (並不是每個系統都會使用消息隊列之類的第三方框架),所以,針對上面的舉例場景,如果沒有線程池的說法,將會:

發站內信啟動一個線程,發完結束線程。發個短信啟動一個線程,發完結束線程。發個推送啟動一個線程,發完結束線程….

有沒有發現,我們會不斷的啟動線程、銷毀線程。

還拿上邊的例子說,如果我們使用線程池的方式的話,可以實現指定線程的數量,這樣的話就算再多的數據需要入庫,只需要排隊等待線程池的線程即可,也就不用一直不斷的創建線程銷毀線程,就不會出現線程池過多而消耗系統資源的情況,當然這只是意見簡單的場景。

快速掌握並發編程---線程池的原理和實戰 2

說到這裡,有人要說了線程不是攜帶資源的最小單位,操作系統的書籍中還給我們說了線程之間的切換消耗很小嗎?

雖然如此,線程是一種輕量級的工具(或者稱之為:輕量級進程),但其創建和關閉依然需要花費時間,如果為了一個很簡單的任務就去創建一個線程,很有可能出現創建和銷毀線程所佔用的時間大於該線程真實工作所消耗的時間,反而得不償失。

定義

為了避免系統頻繁的創建和銷毀線程,我們可以將創建的線程進行複用。數據庫中的數據庫連接池也是此意。

快速掌握並發編程---線程池的原理和實戰 3

在線程池中總有那麼幾個活躍的線程,也有一定的最大值限制,一個業務使用完線程之後,不是立即銷毀而是將其放入到線程池中,從而實現線程的複用。

簡而言之:創建線程變成了從線程池獲取空閒的線程,關閉線程變成了向池子中歸還線程。

線程池的優點

Java 中的線程池是運用場景最多的並發框架,幾乎所有需要異步或併發執行任務的程序都可以使用線程池,Spring、Dubbo、Mybatis等等框架中都有大量的使用線程池。

那線程池到底有哪些好處呢?

在開發過程中,合理地使用線程池能夠帶來幾個好處:

降低系統資源消耗,通過重用已存在的線程,降低線程創建和銷毀造成的消耗;提高系統響應速度,當有任務到達時,通過復用已存在的線程,無需等待新線程的創建便能立即執行;方便線程並發數的管控。因為線程若是無限制的創建,可能會導致內存佔用過多而產生OOM(Out Of Memory),並且會造成CPU過度切換(CPU切換線程是有時間成本的,需要保持當前執行線程的現場,並恢復要執行線程的現場)。提供更強大的功能,延時定時線程池。

線程池的原理

快速掌握並發編程---線程池的原理和實戰 4

線程的複用

線程池將線程和任務進行解耦,線程是線程,任務是任務,擺脫了之前通過Thread 創建線程時的一個線程必須對應一個任務的限制。

在線程池中,同一個線程可以從阻塞隊列中不斷獲取新任務來執行。

其核心原理在於線程池對Thread 進行了封裝,並不是每次執行任務都會調用Thread.start() 來創建新線程,而是讓每個線程去執行一個“循環任務”,在這個“循環任務”中不停的檢查是否有任務需要被執行,如果有則直接執行。

也就是調用任務中的run 方法,將run 方法當成一個普通的方法執行,通過這種方式將只使用固定的線程就將所有任務的run 方法串聯起來。

JDK 自帶線程池

JDK 提供了java.util.concurrent.Executor接口,

可以讓我們有效的管理和控制我們的線程,其實質也就是一個線程池。

public interface Executor {
    void execute(Runnable command);
}

看看Executor實現類就知道,線程池使用的地方是相當多,netty、Spring、Google等。

快速掌握並發編程---線程池的原理和實戰 5

但是我們這篇講的是Executor 的子接口ExecutorService

public interface ExecutorService extends Executor {
    void shutdown();
    List shutdownNow();
    boolean isShutdown();    
    boolean isTerminated();   
    boolean awaitTermination(long timeout, TimeUnit unit)  throws InterruptedException;   
     Future submit(Callable task);    
     Future submit(Runnable task, T result);    
    Future submit(Runnable task);
     List invokeAll(Collection