Categories
程式開發

深入淺出Kotlin協程


本文要點

  • JVM並沒有提供對協程的原生支持
  • Kotlin在編譯器中實現協程是通過將其轉換為一個狀態機實現的
  • Kotlin為實現使用了一個關鍵字,其餘都是通過庫來完成的
  • Kotlin使用連續傳遞風格(Continuation Passing Style,CPS)來實現協程
  • 協程會使用到Dispatcher,所以在JavaFX、Android、Swing等場景下,用法略有差異。

儘管協程並不是一個新的話題,但它是一個很吸引人的話題。正如其他地方的文檔所述,協程在這些年裡被重新發現了很多次,特別是當需要某種形式的輕量級線程和/或尋找“回調地獄”的解決方案時。

最近,協程已經成為JVM上反應式編程的一個流行的替代方案。像RxJavaReactor項目這樣的框架為客戶端提供了一種漸進式處理傳入信息的方式,並對節流和並行提供了廣泛的支持。但是,我們必須圍繞反應流的函數式操作來重構代碼,在許多情況下,這麼做的成本大於收益

舉例來講,這是Android社區需要更簡單替代方案的原因。 Kotlin語言引入協程作為實驗性的特性以滿足該需求,在一些改進之後,它們成為該語言1.3版本的官方特性。 Kotlin協程的應用已經從UI開發擴展到服務器端框架(如Spring 5中添加的支持),甚至包括像Arrow(通過Arrow Fx)這樣的函數式框架。

理解協程的挑戰

令人遺憾的是,理解協程並不是一件容易的任務。儘管有許多Kotlin專家所做的關於協程的演講,其中很多都是啟發性的並且包含非常多的信息,但是想要簡單地知道協程是什麼(或者它該怎樣使用)並不容易。你可能會說協程就是並行編程的等價品。

導致該問題的部分原因在於底層實現。在Kotlin協程中,編譯器只實現了一個suspend關鍵字,其他的事情都是由協程庫處理的。因此,Kotlin協程非常強大和靈活,但在結構上卻不那麼固定。對於初學者來說,這是一個學習的障礙,因為他們學習的最好方法是遵循堅實的指導方針和嚴格的原則。本文會從底層開始介紹協程,希望能夠為讀者提供這樣的基礎知識。

我們的示例應用(服務器端)

我們的應用程序將建立在安全有效地對RESTful服務進行多次調用的規範性(canonical)問題之上。我們將播放Where’s Waldo的一個文本版本——在這個版本中,用戶遵循一系列的名稱,直到他們找到“Waldo”。

下面是完整的RESTful服務,它是使用Http4k編寫的。 Http4k是由Marius Eriksen撰寫的著名論文中描述的函數式服務器架構的Kotlin版本。該實現存在許多其他語言的版本,包括Scala(Http4s)和Java 8或更高版本(Http4j)。

這裡只有一個端點,它通過Map實現了一個名字的鏈。給定一個名字,我們要么以200狀態碼返回匹配的值,要么以404狀態碼返回一條錯誤信息。

fun main() {
   val names = mapOf(
       "Jane" to "Dave",
       "Dave" to "Mary",
       "Mary" to "Pete",
       "Pete" to "Lucy",
       "Lucy" to "Waldo"
   )
   val lookupName = { request: Request ->
       val name = request.path("name")
       val headers = listOf("Content-Type" to "text/plain")
       val result = names[name]
       if (result != null) {
           Response(OK)
               .headers(headers)
               .body(result)
       } else {
           Response(NOT_FOUND)
               .headers(headers)
               .body("No match for $name")
       }
   }
   routes(
       "/wheresWaldo" bind routes(
           "/{name:.*}" bind Method.GET to lookupName
       )
   ).asServer(Netty(8080))
       .start()
}

實際上,我們希望客戶端發送如下的請求鏈:

深入淺出Kotlin協程 1

我們的示例應用(客戶端)

我們的客戶端應用將會基於JavaFX庫來創建用戶界面。但是,為了簡化我們的任務並避免不必要的細節,我們將會使用TornadoFX,它在JavaFX之上提供了一個Kotlin DSL。

如下是客戶端視角的完整定義:

class HelloWorldView: View("Coroutines Client UI") {
   private val finder: HttpWaldoFinder by inject()
   private val inputText = SimpleStringProperty("Jane")
   private val resultText = SimpleStringProperty("")
   override val root = form {
       fieldset("Lets Find Waldo") {
           field("First Name:") {
               textfield().bind(inputText)
               button("Search") {
                   action {
                       println("Running event handler".addThreadId())
                       searchForWaldo()
                   }
               }
           }
           field("Result:") {
               label(resultText)
           }
       }
   }
   private fun searchForWaldo() {
       GlobalScope.launch(Dispatchers.Main) {
           println("Doing Coroutines".addThreadId())
           val input = inputText.value
           val output = finder.wheresWaldo(input)
           resultText.value = output
       }
   }
}class HelloWorldView: View("Coroutines Client UI") {   private val finder: HttpWaldoFinder by inject()   private val inputText = SimpleStringProperty("Jane")   private val resultText = SimpleStringProperty("")   override val root = form {       fieldset("Lets Find Waldo") {           field("First Name:") {               textfield().bind(inputText)               button("Search") {                   action {                       println("Running event handler".addThreadId())                       searchForWaldo()                   }               }           }           field("Result:") {               label(resultText)           }       }   }   private fun searchForWaldo() {       GlobalScope.launch(Dispatchers.Main) {           println("Doing Coroutines".addThreadId())           val input = inputText.value           val output = finder.wheresWaldo(input)           resultText.value = output       }   }}

我們還會使用如下的輔助函數作為String類型的擴展:

fun String.addThreadId() = "$this on thread ${Thread.currentThread().id}"

在運行的時候,UI如下所示:

深入淺出Kotlin協程 2

當用戶點擊按鈕的時候,我們會創建一個新的協程,並通過“HttpWaldoFinder”類型的服務對象訪問RESTful端點。

Kotlin協程存在於一個“CoroutineScope”中,而後者又會反過來和某種Dispatcher關聯,Dispatcher代表了底層的並發模型。並發模型通常是一個線程池,但是會有所差異。

具體哪些Dispatcher可用取決於Kotlin代碼運行的環境。 Main Dispatcher表示UI庫的事件處理線程,因此(在JVM上)只能在Android、JavaFX和Swing中使用。最初,Kotlin Native根本不支持協程的多線程處理,但這種情況正在改變。在服務器端,我們可以自己引入協程,但是在越來越多的情況中,協程默認就是可用的,比如在Spring 5中。

在調用掛起(suspending)方法之前,我們必須將協程、“CoroutineScope”和“Dispatche”準備就緒。如果這是初始調用的話(如上面的代碼所示),我們可以通過“協程構造者”函數,如“launch”和“async”,啟動該過程。

不管是調用協程構建者函數,還是像“withContext”這樣的作用域函數,都會創建一個新的“CoroutineScope”。在該作用域中,任務體現為“Job”實例的層級結構。

它們有一些很有意思的屬性,即:

  • Job會等待其塊內所有協程完成後才能完成自己。
  • 取消一個Job會導致其所有的子Job都被取消。
  • 子Job的失敗或取消會傳播至父Job。

這樣設計的目的是避免並發編程中的常見問題,比如殺死父Job的時候沒有終結其子Job。

訪問REST端點的服務

如下是HttpWaldoFinder服務的完整代碼:

class HttpWaldoFinder : Controller(), WaldoFinder {
   override suspend fun wheresWaldo(starterName: String): String {
       val firstName = fetchNewName(starterName)
       println("Found $firstName name".addThreadId())
       val secondName = fetchNewName(firstName)
       println("Found $secondName name".addThreadId())
       val thirdName = fetchNewName(secondName)
       println("Found $thirdName name".addThreadId())
       val fourthName = fetchNewName(thirdName)
       println("Found $fourthName name".addThreadId())
       return fetchNewName(fourthName)
   }
   private suspend fun fetchNewName(inputName: String): String {
       val url = URI("http://localhost:8080/wheresWaldo/$inputName")
       val client = HttpClient.newBuilder().build()
       val handler = HttpResponse.BodyHandlers.ofString()
       val request = HttpRequest.newBuilder().uri(url).build()
       return withContext(Dispatchers.IO) {
           println("Sending HTTP Request for $inputName".addThreadId())
           client
               .send(request, handler)
               .body()
       }
   }
}

“fetchNewName”函數會接收一個已知的名字,並查詢端點獲取關聯的名字。這是通過使用“HttpClient”類型來實現的,該類型是從Java 11開始作為標準的。實際的HTTP GET會在一個新的子協程中運行,該子協程會使用IO Dispatcher。它的表現形式是一個為長時間運行的活動(如網絡調用)優化的線程池。

“wheresWaldo”函數會根據名字鏈執行五次,以便於(盡力)找到Waldo。我們隨後將會分解生成的字節碼,所以我們讓實現盡可能地簡單。我們感興趣的是,每次調用“fetchNewName”都會導致當前協程在子協程運行時被掛起。在這個特殊場景下,父Job運行在Main Dispatcher上,而子Job運行在IO Dispatcher上。因此,當子Job執行HTTP請求時,UI事件處理線程會被釋放出來處理與視圖的用戶交互。如下圖所示。

深入淺出Kotlin協程 3

IntelliJ會為我們展示何時發起掛起調用,因此會在協程間轉換控制。需要注意,如果我們沒有切換Dispatcher的話,那發起調用不一定會創建新的協程。當一個掛起函數調用另一個掛起函數時,可能會在相同的協程中繼續,如果我們真的希望保持在同一個線程上,那麼這的確就是我們想要的行為。

深入淺出Kotlin協程 4

當我們執行客戶端的時候,如下就是控制台的輸出:

深入淺出Kotlin協程 5

我們可以看到,在這個特殊的場景下,Main Dispatcher/UI事件處理器運行在17號線程上,而IO Dispatcher運行在線程池上,包括24號和26號線程。

開始我們的調查

借助IntelliJ自帶的字節碼分解工具,我們可以看出到底發生了什麼。另外,我們也可以使用JDK提供的標準“javap”工具。

深入淺出Kotlin協程 6

我們可以看到“HttpWaldoFinder”的方法改變了其簽名,所以它們可以接受一個Continuation對像作為其額外的參數,並且返回某種通用的對象。

public final class HttpWaldoFinder extends Controller implements WaldoFinder {
  public Object wheresWaldo(String a, Continuation b)
  final synthetic Object fetchNewName(String a, Continuation b)
}

現在,我們深入代碼看一下這些方法都添加了些什麼,並闡述“Continuation”是什麼以及現在返回的是什麼。

連續傳遞風格(Continuation Passing Style,CPS)

按照Kotlin標準化過程對協程提議的文檔描述,協程的實現是基於連續傳遞風格(Continuation Passing Style,CPS)的。會有一個continuation對象來存儲函數在掛起階段所需的狀態。

在本質上,掛起函數的每個局部變量都會成為continuation的一個字段。另外,還需要創建字段來存儲所有的參數和當前對象(如果函數是方法的話)。所以,一個掛起方法如果有四個參數和五個局部變量的話,continuation至少要有10個字段。

在“HttpWaldoFinder”的“wheresWaldo”方法中,有一個參數和四個本地變量,所以我們預期continuation實現類型會有六個字段。如果我們將Kotlin編譯器生成的字節碼分解成Java源碼的話,就會發現事實確實如此:

$continuation = new ContinuationImpl($completion) {
  Object result;
  int label;
  Object L$0;
  Object L$1;
  Object L$2;
  Object L$3;
  Object L$4;
  Object L$5;
  @Nullable
  public final Object invokeSuspend(@NotNull Object $result) {
     this.result = $result;
     this.label |= Integer.MIN_VALUE;
     return HttpWaldoFinder.this.wheresWaldo((String)null, this);
  }
};

鑑於所有的字段都是Object類型的,所以它們該如何使用並不明顯。隨著進一步探索,我們將會看到:

  • “L$0”持有對“HttpWaldoFinder”實例的引用。它始終都會存在。
  • “L$1”持有“starterName”參數的值。它始終都會存在。
  • “L$2”到“L$5”持有本地變量的值。隨著代碼的執行,它們將會漸進式地填充進來。 “L$2”會持有“firstName”的值,以此類推。

我們還有額外的字段存儲最終結果,有一個名為“label”的有趣的整型字段。

掛起還是不​​掛起——這是一個問題

當我們檢查生成的代碼時,需要記住它要處理兩個場景。每當一個掛起的函數調用另一個函數時,它可能會掛起當前的協程(這樣另外一個函數可以在相同的線程上運行),也可能在當前協程會繼續執行。

我們考慮一個從數據存儲中讀取值的掛起函數。當I/O發生的時候,它很可能會被掛起,但是它也可能會緩存結果。後續調用可以同步返回緩存的值,不需要任何的掛起。 Kotlin編譯器所生成的代碼必須要同時支持這兩種路徑。

Kotlin會調整每個掛起函數的返回類型,這樣的話,它要么返回真正的結果,要么返回特殊的值COROUTINE_SUSPENDED。如果是後者的話,當前的協程會被掛起。這也是為什麼掛起函數的返回類型從結果類型變成了“Object”。

在我們的樣例應用中,“wheresWaldo”會重複調用“fetchNewName”。在理論上,每次這樣的調用都可能掛起或不掛起當前的協程。按照我們編寫“fetchNewName”的方式,可以知道,掛起始終都會發生。但是,為了理解生成的代碼,我們必須記住它需要處理所有的可能性。

大的Switch語句和Label

如果我們進一步查看分解後的代碼,會發現一個隱藏在多個嵌套label中的switch語句。這是一個狀態機的實現,用來在wheresWaldo()方法中控制不同的掛起點。下面是整體的結構:

// 程序清单1:生成的switch语句和label
String firstName;
String secondName;
String thirdName;
String fourthName;
Object var11;
Object var10000;
label48: {
  label47: {
     label46: {
        Object $result = $continuation.result;
        var11 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch($continuation.label) {
        case 0:
            // 省略代码
        case 1:
            // 省略代码
        case 2:
            // 省略代码
        case 3:
            // 省略代码
        case 4:
            // 省略代码
        case 5:
            // 省略代码
        default:
           throw new IllegalStateException(
                "call to 'resume' before 'invoke' with coroutine");
        } // 结束 switch
        // 省略代码
    } // 结束 label 46
    // 省略代码
  } // 结束 label 47
  // 省略代码
} // 结束 label 48
// 省略代码

我們現在可以看到continuation中“label”字段的目的了。當完成“wheresWaldo”的不同階段時,我們將會變更“label”中的值。嵌套的label代碼塊中包含了原始Kotlin代碼裡面掛起點之間的代碼塊。這個“label”值允許重新進入該代碼,跳到最後掛起的地方(適當的case語句),以便於從continuation中抽取數據,然後跳轉到正確的label代碼塊。

但是,如果所有的掛起點都沒有真正掛起的話,整個代碼塊可以同步執行。在生成的代碼中,我們經常會看到這樣的片段:

// 程序清单2:确定当前的协程是否应该挂起
if (var10000 == var11) {
  return var11;
}

從上面我們可以看到,“var11”被設置成了CONTINUATION_SUSPENDED的值,而“var10000”保存了對另一個掛起函數的調用的返回值。因此,當掛起發生時,代碼將返回(稍後會重新進入),如果沒有發生掛起,則代碼將通過切換到適當的label塊繼續執行函數的下一部分。

再次強調,請記住,生成的代碼不能假設所有調用都將掛起,或者所有調用都將繼續使用當前的協程。它必須能夠處理任何可能的組合。

跟踪執行

當我們開始執行時,continuation中“label”的值將會被置為零。如下是對應的switch語句分支:

// 程序清单3:switch的第一个分支
case 0:
  ResultKt.throwOnFailure($result);
  $continuation.L$0 = this;
  $continuation.L$1 = starterName;
  $continuation.label = 1;
  var10000 = this.fetchNewName(starterName, $continuation);
  if (var10000 == var11) {
     return var11;
  }
  break;

我們將實例和參數存儲到了continuation對像中,然後將continuation對像傳遞給“fetchNewName”。如前文所述,編譯器所生成的“fetchNewName”版本要么返回實際的結果,要么返回COROUTINE_SUSPENDED值。

如果協程掛起的話,那麼我們會從函數中返回,並且當我們恢復的時候,會跳入“case 1”分支。如果我們繼續使用當前的協程,那麼會跳出switch到某一個label代碼塊,進入如下的代碼:

// 程序清单4:第二次调用“fetchNewName”
firstName = (String)var10000;
secondName = UtilsKt.addThreadId("Found " + firstName + " name");
boolean var13 = false;
System.out.println(secondName);
$continuation.L$0 = this;
$continuation.L$1 = starterName;
$continuation.L$2 = firstName;
$continuation.label = 2;
var10000 = this.fetchNewName(firstName, $continuation);
if (var10000 == var11) {
  return var11;
}

因為我們知道“var10000”包含了我們期望的返回值,所以我們可以將其轉換成正確的類型並存儲到局部變量“firstName”中。隨後,生成的代碼會使用“secondName”來存儲線程id連接的結果,並且將其打印了出來。

我們更新了continuation中的字段,添加了服務器檢索到的值。注意,“label”的值現在已經是2了。隨後,我們第三次調用“fetchNewName”。

第三次調用“fetchNewName”——無掛起

我們需要再次基於“fetchNewName”返回的值做出選擇,如果返回的值是COROUTINE_SUSPENDED的話,我們會從當前函數返回。當下一次調用時,我們要遵循switch的“case 2”分支。

如果我們繼續使用當前協程的話,那麼將會執行如下的代碼塊。我們可以看到它與上面的代碼是相同的,只不過我們現在要有更多的數據存儲到continuation中。

// 程序清单4:第三次调用“fetchNewName”
secondName = (String)var10000;
thirdName = UtilsKt.addThreadId("Found " + secondName + " name");
boolean var14 = false;
System.out.println(thirdName);
$continuation.L$0 = this;
$continuation.L$1 = starterName;
$continuation.L$2 = firstName;
$continuation.L$3 = secondName;
$continuation.label = 3;
var10000 = this.fetchNewName(secondName, (Continuation)$continuation);
if (var10000 == var11) {
  return var11;
}

這種模式在後續的調用中會重複(假定始終沒有返回COROUTINE_SUSPENDED),直到到達終點為止。

第三次調用“fetchNewName”——帶有掛起

或者,如果協程已經被掛起的話,那麼將會運行如下的代碼塊:

// 程序清单5:switch的第三个分支
case 2:
  firstName = (String)$continuation.L$2;
  starterName = (String)$continuation.L$1;
  this = (HttpWaldoFinder)$continuation.L$0;
  ResultKt.throwOnFailure($result);
  var10000 = $result;
  break label46;

我們將continuation中的值抽取到函數的局部變量中。隨後使用一個label形式的break使執行跳轉至前述的程序清單4中。所以最終,我們會在相同的地方結束。

總結執行過程

現在,我們可以重新看一下程序清單的代碼結構,並在整體上描述一下每個區域中都發生了什麼:

// 程序清单6:深度解析生成的switch语句和label
String firstName;
String secondName;
String thirdName;
String fourthName;
Object var11;
Object var10000;
label48: {
  label47: {
     label46: {
        Object $result = $continuation.result;
        var11 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch($continuation.label) {
       case 0:
            // 将label设置为1,如果返回挂起的话,第一次调用“fetchNewName”
            // 否则,从switch中break退出
        case 1:
            // 从continuation中抽取参数
            // 从switch中break退出
        case 2:
            // 从continuation中抽取参数和第一个结果
            // 跳转到外边的“label46”
        case 3:
            // 从continuation中抽取参数,第一个和第二个结果
            // 跳转到外边的“label47”
        case 4:
            // 从continuation中抽取参数,第一个、第二个和第三个结果
            // 跳转到外边的“label48”
        case 5:
            // 从continuation中抽取参数,第一个、第二个、第三个和第四个结果
            // 返回最后的结果
        default:
           throw new IllegalStateException(
                "call to 'resume' before 'invoke' with coroutine");
        } // 结束 switch
        // 存储参数和第一个结果到continuation中
        // 如果返回挂起的话,将label设置为2并对“fetchNewName”进行第二次调用
        // 否则的话继续进行
    } // 结束label 46
        // 存储参数、第一个结果和第二个结果到continuation中
        // 如果返回挂起的话,将label设置为3并对“fetchNewName”进行第三次调用
        // 否则的话继续进行
  } // 结束label 47
        //  存储参数、第一个结果、第二个结果和第三个结果到continuation中
        //  如果返回挂起的话,将label设置为4并对“fetchNewName”进行第四次调用
        // 否则的话继续进行
} // 结束label 48
// 存储参数、第一个结果、第二个结果、第三个结果和第四个结果到continuation中
// 将label设置为5并对“fetchNewName”进行第五次调用
// 返回最终结果或COROUTINE_SUSPENDED

結果

這個代碼庫理解起來並不簡單。我們分析了字節碼分解得到的Java代碼,這些字節碼是由Kotlin編譯器中的代碼生成器所產生的。這個代碼生成器的輸出在設計時考慮的是效率和最小化,而不是可理解性

但是,我們可以得出一些有用的結論:

  1. 並沒有什麼魔法。當開發人員第一次開始學習協程時,很容易會認為有些特殊的“魔法”將所有的這些事情連接在了一起。我們可以看到,生成的代碼只使用了一些過程式編程的基本構建塊,比如條件語句和帶有label的break。
  2. 實現是基於continuation的。在最初的KEEP提議中,函數掛起和恢復的方法是將函數的狀態緩存在一個對像中。因此,對於每個掛起函數,編譯器將創建一個包含N個字段的continuation類型,其中N是參數的數量加上字段數再加上3。最後的三個字段保存當前對象、最終結果和索引。
  3. 執行始終遵循一個標準的模式。如果從掛起中恢復,那麼我們要使用continuation的“label”字段跳轉到switch語句的適當分支。在這個分支中,我們從continuation對象檢索到目前為止已經找到的數據,然後使用一個帶有label的break跳轉到沒有發生掛起的代碼,這些代碼本來是要直接執行的。

作者簡介

Garth Gilmour是Instil的學習主管(Head of Learning)。早在1999年,他就放棄了全職的開發工作,開始講授C++給C程序員,然後講授Java給C++程序員,然後是講授C#給Java程序員,現在他什麼都教,但更喜歡Kotlin相關的工作。如果計算學員數量的話,很久之前就超過1000人了。他是20多門課程的作者,經常在技術會議上發言,在國內和國際會議上演講,共同組織了貝爾法斯特BASH的開發人員系列活動,最近成立了貝爾法斯特Kotlin用戶組。不在白板之前的時候,他還擔任近身格鬥和舉重的教練。

Eamonn Boyle有超過15年的開發、架構和團隊領導經驗。在過去的4年裡,他一直擔任全職培訓師和教練,為各種各樣的客戶撰寫和講授各種主題的課程。其中包括範式和技術,從核心語言技能、框架到工具和過程。他還在一系列會議、活動和技術聚會上發表演講並舉辦workshop,其中包括Goto和KotlinConf。

原文鏈接:

A Bottom-Up View of Kotlin Coroutines